网站开发 ie兼容,ssh鲜花礼品网站建设,淮安网站建设推广,wordpress主题视频站rabbitmq五种模式的总结 完整项目地址#xff1a;https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式
#xff08;一#xff09;简单模式概述
RabbitMQ 的简单模式是最基础的消息队列模式#xff0c;包含以下两个角色#xff1a;
生产者#xff1a;负责发…rabbitmq五种模式的总结 完整项目地址https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式
一简单模式概述
RabbitMQ 的简单模式是最基础的消息队列模式包含以下两个角色
生产者负责发送消息到队列。消费者负责从队列中接收并处理消息。
在简单模式中消息的传递是单向的生产者将消息发送到队列消费者从队列中接收消息。 二生产者代码解析
代码
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(ip要换成真实的ip哦);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明队列/*** 参数说明* 1. 队列名称01-hello2* 2. 是否持久化true重启后队列仍然存在* 3. 是否独占队列false允许多个消费者连接* 4. 是否自动删除false队列不会自动删除* 5. 额外参数null*/channel.queueDeclare(01-hello2, true, false, false, null);// 6. 发送消息/*** 参数说明* 1. 交换机名称空字符串使用默认交换机* 2. 路由键队列名称01-hello2* 3. 额外属性null* 4. 消息内容字节数组*/channel.basicPublish(, 01-hello2, null, hello rabbitmq2.getBytes());System.out.println(消息发送成功);// 7. 关闭资源channel.close();connection.close();}
}结果 三消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(ip要换成真实的ip哦);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明队列需与生产者保持一致channel.queueDeclare(01-hello2, false, false, false, null);// 6. 接收消息/*** 参数说明* 1. 队列名称01-hello2* 2. 是否自动确认true消息被消费后自动确认* 3. 消息处理回调DeliverCallback* 4. 消息取消回调CancelCallback*/channel.basicConsume(01-hello2, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println(接收到消息 new String(delivery.getBody()));}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {System.out.println(消息被取消);}});}
}结果 在mq中查看 四总结
简单模式适用于一对一的简单消息传递场景。生产者负责创建队列并发送消息。消费者负责从队列中接收并处理消息。注意事项 队列名称需保持一致不然一定会报错消息确认机制需根据业务需求选择自动或手动确认。使用完资源后需显式关闭 Channel 和 Connection。
二、工作模式
一工作模式概述
工作模式是 RabbitMQ 的一种常见模式用于将任务分发给多个消费者。它的特点是
一个生产者负责发送消息到队列。多个消费者共同消费同一个队列中的消息。消息分发机制默认情况下RabbitMQ 会以轮询Round-Robin的方式将消息分发给消费者。
工作模式适用于任务分发场景例如将耗时的任务分发给多个 Worker 处理。 二生产者代码解析
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(你的ip别忘了改);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明队列/*** 参数说明* 1. 队列名称02-work1* 2. 是否持久化true重启后队列仍然存在* 3. 是否独占队列false允许多个消费者连接* 4. 是否自动删除false队列不会自动删除* 5. 额外参数null*/channel.queueDeclare(02-work1, true, false, false, null);// 6. 发送消息for (int i 0; i 20; i) {String message hello work: i;channel.basicPublish(, 02-work1, null, message.getBytes());}System.out.println(消息发送成功);// 7. 关闭资源channel.close();connection.close();}
}关键点
队列声明queueDeclare创建队列并设置队列属性。消息发送basicPublish通过循环发送多条消息到队列。持久化队列设置为 true确保队列在 RabbitMQ 重启后仍然存在。 三消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(你的ip别忘了改);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明队列需与生产者保持一致channel.queueDeclare(02-work1, true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明* 1. 队列名称02-work1* 2. 是否自动确认false手动确认消息* 3. 消息处理回调DeliverCallback* 4. 消息取消回调CancelCallback*/channel.basicConsume(02-work1, false, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者1 接收到消息 new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {System.out.println(消息被取消);}});}
}关键点
队列声明queueDeclare确保队列存在需与生产者保持一致。消息预取basicQos设置每次只接收一条消息避免某个消费者处理过多消息。手动确认basicAck消息处理完成后手动确认确保消息不会丢失。消息处理耗时通过 Thread.sleep(1000) 模拟消息处理耗时。
效果 四工作模式的特点
消息分发机制 默认情况下RabbitMQ 会以轮询的方式将消息分发给多个消费者。可以通过 basicQos 设置每次只接收一条消息避免某个消费者处理过多消息。 消息确认机制 设置为手动确认autoAckfalse确保消息处理完成后才确认。防止业务处理失败的情况下丢失消息如果消费者在处理消息时崩溃未确认的消息会重新分发给其他消费者。 适用场景 任务分发场景例如将耗时的任务分发给多个 Worker 处理。 五总结
工作模式适用于任务分发场景多个消费者共同消费同一个队列中的消息。生产者负责发送消息到队列。消费者负责接收并处理消息支持手动确认和消息预取。注意事项 队列名称需保持一致。消息确认机制需根据业务需求选择自动或手动确认。使用 basicQos 控制消息分发避免某个消费者处理过多消息。
三、发布订阅模式
一发布订阅模式概述
发布订阅模式Publish/Subscribe Mode是 RabbitMQ 的一种模式用于将消息广播给多个消费者。它的特点是
一个生产者将消息发送到交换机Exchange。多个消费者每个消费者都有自己的队列并与交换机绑定。消息广播交换机将消息广播给所有绑定的队列。
发布订阅模式适用于消息广播场景例如日志系统、通知系统等。 二生产者代码解析
生产者负责创建消息并将其发送到指定的交换机中。
package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(用自己的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明交换机/*** 参数说明* 1. 交换机名称03-pubsub* 2. 交换机类型fanout广播模式*/channel.exchangeDeclare(03-pubsub, fanout);// 6. 发送消息for (int i 0; i 20; i) {String message hello work: i;/*** 参数说明* 1. 交换机名称03-pubsub* 2. 路由键空字符串fanout 模式忽略路由键* 3. 消息属性MessageProperties.TEXT_PLAIN* 4. 消息内容字节数组*/channel.basicPublish(03-pubsub, , MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println(消息发送成功);// 7. 关闭资源channel.close();connection.close();}
}关键点
交换机声明exchangeDeclare创建交换机并设置类型为 fanout广播模式。消息发送basicPublish将消息发送到交换机路由键为空字符串fanout 模式忽略路由键。消息广播消息会被广播到所有绑定到该交换机的队列。 三消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(用自己的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明交换机channel.exchangeDeclare(03-pubsub, fanout);// 6. 创建临时队列String queue channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明* 1. 队列名称queue* 2. 交换机名称03-pubsub* 3. 路由键空字符串fanout 模式忽略路由键*/channel.queueBind(queue, 03-pubsub, );// 8. 接收消息/*** 参数说明* 1. 队列名称queue* 2. 是否自动确认true自动确认消息* 3. 消息处理回调DeliverCallback* 4. 消息取消回调CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者2 接收到消息 new String(delivery.getBody()));}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {System.out.println(消息被取消);}});}
}关键点
交换机声明exchangeDeclare确保交换机存在需与生产者保持一致。临时队列queueDeclare创建一个临时队列队列名称由 RabbitMQ 自动生成。队列绑定queueBind将队列绑定到交换机路由键为空字符串fanout 模式忽略路由键。消息接收basicConsume从队列中接收消息并处理。
结果 可以看到两个consumer都消费了相同的消息 四发布订阅模式的特点
消息广播交换机将消息广播给所有绑定的队列。临时队列消费者可以创建临时队列队列名称由 RabbitMQ 自动生成。适用场景 日志系统将日志消息广播给多个消费者。通知系统将通知消息广播给多个用户。 五总结
发布订阅模式适用于消息广播场景多个消费者各自接收相同的消息。生产者负责将消息发送到交换机。消费者负责创建队列并绑定到交换机接收并处理消息。注意事项 交换机类型需设置为 fanout。队列绑定到交换机时路由键为空字符串。临时队列的名称由 RabbitMQ 自动生成。 六RabbitMQ 交换机类型总结
交换机类型描述路由行为适用场景Fanout广播模式将消息发送到所有绑定到该交换机的队列。忽略路由键Routing Key消息会被广播到所有绑定的队列。日志系统、通知系统等需要广播消息的场景。Direct直接模式根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。Topic主题模式根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配* 匹配一个单词# 匹配零个或多个单词。消息分类、多条件路由等需要灵活匹配的场景。Headers头部模式根据消息的头部属性Headers进行匹配。不依赖路由键而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑例如根据消息的元数据进行路由。 详细说明
1. Fanout 交换机广播常用
特点 消息会被广播到所有绑定到该交换机的队列。忽略路由键Routing Key。 适用场景 日志系统将日志消息广播给多个消费者。通知系统将通知消息广播给多个用户。
2. Direct 交换机
特点 消息的路由键必须与队列绑定的路由键完全匹配。支持一对一或一对多的精确路由。 适用场景 任务分发将特定任务路由到特定的 Worker。点对点通信将消息发送到特定的接收者。
3. Topic 交换机
特点 支持通配符匹配 * 匹配一个单词。# 匹配零个或多个单词。 路由键的格式通常是点分字符串如 user.create。 适用场景 消息分类根据消息的主题进行路由。多条件路由支持灵活的路由规则。
4. Headers 交换机
特点 不依赖路由键而是通过消息的头部属性Headers进行匹配。支持复杂的匹配规则如 x-match 参数。 适用场景 复杂的路由逻辑根据消息的元数据进行路由。需要高度灵活性的场景。 对比
场景FanoutDirectTopicHeaders日志广播所有消费者接收相同的日志消息。不适用。不适用。不适用。任务分发不适用。将任务路由到特定的 Worker。将任务分类路由到不同的 Worker。根据任务的元数据进行路由。通知系统所有用户接收相同的通知。特定用户接收特定通知。根据通知类型路由到不同用户。根据通知的元数据进行路由。消息分类不适用。不适用。根据消息主题进行路由。根据消息的头部属性进行路由。 总结
Fanout适用于广播场景。Direct适用于精确路由场景。Topic适用于灵活的路由场景。Headers适用于复杂的路由逻辑。
四、路由模式
一路由模式概述
路由模式是 RabbitMQ 的一种模式使用 Direct 交换机 根据消息的 路由键Routing Key 将消息发送到匹配的队列。它的特点是
一个生产者将消息发送到 Direct 交换机并指定路由键。多个消费者每个消费者可以绑定一个或多个路由键只有匹配的路由键的消息才会被接收。精确路由消息的路由键必须与队列绑定的路由键完全匹配。
路由模式适用于需要根据特定条件精确路由消息的场景例如日志级别分类、任务分发等。 二生产者代码解析
生产者负责创建消息并将其发送到 Direct 交换机同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(你的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明* 1. 交换机名称04-routing* 2. 交换机类型direct*/channel.exchangeDeclare(04-routing, direct);// 6. 发送消息for (int i 0; i 20; i) {String message hello work: i;/*** 参数说明* 1. 交换机名称04-routing* 2. 路由键err消息将发送到绑定 err 路由键的队列* 3. 消息属性MessageProperties.TEXT_PLAIN* 4. 消息内容字节数组*/channel.basicPublish(04-routing, err, MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println(消息发送成功);// 7. 关闭资源channel.close();connection.close();}
}关键点
交换机声明exchangeDeclare创建 Direct 交换机类型为 direct。消息发送basicPublish指定路由键如 err消息会被发送到绑定该路由键的队列。路由键匹配只有队列绑定的路由键与消息的路由键完全匹配时消息才会被路由到该队列。 三消费者代码解析
代码
消费者负责创建队列并绑定到 Direct 交换机同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(你的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare(04-routing, direct);// 6. 创建临时队列String queue channel.queueDeclare().getQueue();// 7. 绑定队列到交换机并指定路由键/*** 参数说明* 1. 队列名称queue* 2. 交换机名称04-routing* 3. 路由键info、err、waring*/channel.queueBind(queue, 04-routing, info);channel.queueBind(queue, 04-routing, err);channel.queueBind(queue, 04-routing, waring);// 8. 接收消息/*** 参数说明* 1. 队列名称queue* 2. 是否自动确认true自动确认消息* 3. 消息处理回调DeliverCallback* 4. 消息取消回调CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者1 接收到消息 new String(delivery.getBody()));}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {System.out.println(消息被取消);}});}
}关键点
交换机声明exchangeDeclare确保 Direct 交换机存在需与生产者保持一致。临时队列queueDeclare创建一个临时队列队列名称由 RabbitMQ 自动生成。队列绑定queueBind将队列绑定到交换机并指定路由键如 info、err、waring。消息接收basicConsume从队列中接收消息并处理。
效果
consumer1绑定了[infoerrwaring]所以在producer绑定了info时发送消息的情况下consumer1可以接收到信息 由于consumer2绑定的是trace所以consumer2是接收不到消息的 四路由模式的特点
精确路由消息的路由键必须与队列绑定的路由键完全匹配。多路由键支持一个队列可以绑定多个路由键接收多种类型的消息。适用场景 日志级别分类将不同级别的日志如 info、err路由到不同的队列。任务分发将特定任务路由到特定的 Worker。 五总结
路由模式适用于需要根据路由键精确路由消息的场景。生产者负责将消息发送到 Direct 交换机并指定路由键。消费者负责创建队列并绑定到 Direct 交换机同时指定路由键。注意事项 路由键必须完全匹配。一个队列可以绑定多个路由键接收多种类型的消息。
五、Topic 模式
一Topic 模式概述
Topic 模式是 RabbitMQ 的一种模式使用 Topic 交换机 根据消息的 路由键Routing Key 进行模式匹配将消息发送到符合条件的队列。它的特点是
一个生产者将消息发送到 Topic 交换机并指定路由键。多个消费者每个消费者可以绑定一个或多个路由键模式只有匹配的路由键的消息才会被接收。灵活的路由支持通配符匹配 * 匹配一个单词。# 匹配零个或多个单词。
Topic 模式适用于需要根据复杂条件灵活路由消息的场景例如消息分类、多条件路由等。 二生产者代码解析
代码
生产者负责创建消息并将其发送到 Topic 交换机同时指定路由键。
package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(用自己的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明* 1. 交换机名称05-topic* 2. 交换机类型topic*/channel.exchangeDeclare(05-topic, topic);// 6. 发送消息for (int i 0; i 20; i) {String message hello work: i;/*** 参数说明* 1. 交换机名称05-topic* 2. 路由键user.hi消息将发送到匹配 user.* 或 user.# 的队列* 3. 消息属性MessageProperties.TEXT_PLAIN* 4. 消息内容字节数组*/channel.basicPublish(05-topic, user.hi, MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println(消息发送成功);// 7. 关闭资源channel.close();connection.close();}
}关键点
交换机声明exchangeDeclare创建 Topic 交换机类型为 topic。消息发送basicPublish指定路由键如 user.hi消息会被发送到匹配的队列。通配符匹配 * 匹配一个单词如 user.* 匹配 user.hi但不匹配 user.hi.there。# 匹配零个或多个单词如 user.# 匹配 user.hi 和 user.hi.there。 三消费者代码解析
代码
消费者负责创建队列并绑定到 Topic 交换机同时指定路由键模式。
package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost(用自己的ip);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin);// 3. 创建连接对象Connection connection connectionFactory.newConnection();// 4. 创建 ChannelChannel channel connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare(05-topic, topic);// 6. 创建临时队列String queue channel.queueDeclare().getQueue();// 7. 绑定队列到交换机并指定路由键模式/*** 参数说明* 1. 队列名称queue* 2. 交换机名称05-topic* 3. 路由键模式user.*匹配 user.hi、user.hello 等*/channel.queueBind(queue, 05-topic, user.*);// 8. 接收消息/*** 参数说明* 1. 队列名称queue* 2. 是否自动确认true自动确认消息* 3. 消息处理回调DeliverCallback* 4. 消息取消回调CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消费者2 user.* 接收到消息 new String(delivery.getBody()));}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {System.out.println(消息被取消);}});}
}关键点
交换机声明exchangeDeclare确保 Topic 交换机存在需与生产者保持一致。临时队列queueDeclare创建一个临时队列队列名称由 RabbitMQ 自动生成。队列绑定queueBind将队列绑定到交换机并指定路由键模式如 user.*。消息接收basicConsume从队列中接收消息并处理。
效果
当我在producer使用“employee.hi”作为路由key的时候绑定了“employee.*”的consumer1可以消费这个消息 四Topic 模式的特点
灵活的路由支持通配符匹配可以根据复杂的条件路由消息。多路由键支持一个队列可以绑定多个路由键模式接收多种类型的消息。适用场景 消息分类根据消息的主题进行路由。多条件路由支持灵活的路由规则。 五总结
Topic 模式适用于需要根据复杂条件灵活路由消息的场景。生产者负责将消息发送到 Topic 交换机并指定路由键。消费者负责创建队列并绑定到 Topic 交换机同时指定路由键模式。注意事项 路由键模式支持通配符 * 和 #。一个队列可以绑定多个路由键模式接收多种类型的消息。