常州企业免费建站,上海市公共招聘网官网,西安网站设计制,增城网站建设价格文章目录 一、RabbitMQ 架构简介二、准备工作 三、消息收发1. Hello World2. Work queues3. Publish/Subscrite3.1. Direct3.2. Fanout3.3. Topic3.4. Header 4. Routing5. Topics 大部分情况下#xff0c;我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。 一、RabbitMQ 架构简介
一图胜千言如下 这张图中涉及到如下一些概念
生产者Publisher发布消息到 RabbitMQ 中的交换机Exchange上。 交换机Exchange和生产者建立连接并接收生产者的消息。 消费者Consumer监听 RabbitMQ 中的 Queue 中的消息。 队列QueueExchange 将消息分发到指定的 QueueQueue 和消费者进行交互。 路由Routes交换机转发消息到队列的规则。
二、准备工作
大家知道RabbitMQ 是 AMQP 阵营里的产品Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp因此首先创建 Spring Boot 项目并添加该依赖如下 项目创建成功后在 application.properties 中配置 RabbitMQ 的基本连接信息如下
spring.rabbitmq.hostlocalhost
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
spring.rabbitmq.port5672接下来进行 RabbitMQ 配置在 RabbitMQ 中所有的消息生产者提交的消息都会交由 Exchange 进行再分配Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 官网介绍了如下几种消息分发的形式 这里我主要和大家介绍前六种消息收发方式。
三、消息收发
1. Hello World
咦这个咋没有交换机这个其实是默认的交换机我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下 来看看代码实现
先来看看队列的定义
Configuration
public class HelloWorldConfig {public static final String HELLO_WORLD_QUEUE_NAME hello_world_queue;BeanQueue queue1() {return new Queue(HELLO_WORLD_QUEUE_NAME);}
}再来看看消息消费者的定义
Component
public class HelloWorldConsumer {RabbitListener(queues HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(String msg) {System.out.println(msg msg);}
}消息发送
SpringBootTest
class RabbitmqdemoApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid contextLoads() {rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, hello);}}这个时候使用的其实是默认的直连交换机DirectExchangeDirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上当一条消息到达 DirectExchange 时会被转发到与该条消息routing key相同的 Queue 上例如消息队列名为 “hello-queue”则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
2. Work queues
这种情况是这样的
一个生产者一个默认的交换机DirectExchange一个队列两个消费者如下图 一个队列对应了多个消费者默认情况下由队列对消息进行平均分配消息会被分到不同的消费者手中。消费者可以配置各自的并发能力进而提高消息的消费能力也可以配置手动 ack来决定是否要消费某一条消息。
先来看并发能力的配置如下
Component
public class HelloWorldConsumer {RabbitListener(queues HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(String msg) {System.out.println(receive msg);}RabbitListener(queues HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency 10)public void receive2(String msg) {System.out.println(receive2 msg-------Thread.currentThread().getName());}
}可以看到第二个消费者我配置了 concurrency 为 10此时对于第二个消费者将会同时存在 10 个子线程去消费消息。
启动项目在 RabbitMQ 后台也可以看到一共有 11 个消费者。 此时如果生产者发送 10 条消息就会一下都被消费掉。
消息发送方式如下
SpringBootTest
class RabbitmqdemoApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid contextLoads() {for (int i 0; i 10; i) {rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, hello);}}}消息消费日志如下 可以看到消息都被第一个消费者消费了。但是小伙伴们需要注意事情并不总是这样多试几次就可以看到差异消息也有可能被第一个消费者消费只是由于第二个消费者有十个线程一起开动所以第二个消费者消费的消息占比更大。
当然消息消费者也可以开启手动 ack这样可以自行决定是否消费 RabbitMQ 发来的消息配置手动 ack 的方式如下
spring.rabbitmq.listener.simple.acknowledge-modemanual消费代码如下
Component
public class HelloWorldConsumer {RabbitListener(queues HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)public void receive(Message message,Channel channel) throws IOException {System.out.println(receivemessage.getPayload());channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}RabbitListener(queues HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency 10)public void receive2(Message message, Channel channel) throws IOException {System.out.println(receive2 message.getPayload() ------- Thread.currentThread().getName());channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);}
}此时第二个消费者拒绝了所有消息第一个消费者消费了所有消息。
这就是 Work queues 这种情况。
3. Publish/Subscrite
再来看发布订阅模式这种情况是这样
一个生产者多个消费者每一个消费者都有自己的一个队列生产者没有将消息直接发送到队列而是发送到了交换机每个队列绑定交换机生产者发送的消息经过交换机到达队列实现一个消息被多个消费者获取的目的。需要注意的是如果将消息发送到一个没有队列绑定的 Exchange上面那么该消息将会丢失这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力只有队列具备存储消息的能力如下图 这种情况下我们有四种交换机可供选择分别是
Direct Fanout Topic Header 我分别来给大家举一个简单例子看下。
3.1. Direct
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上例如消息队列名为 “hello-queue”则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下
Configuration
public class RabbitDirectConfig {public final static String DIRECTNAME javaboy-direct;BeanQueue queue() {return new Queue(hello-queue);}Bean
DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}BeanBinding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(direct);}
}首先提供一个消息队列Queue然后创建一个DirectExchange对象三个参数分别是名字重启后是否依然有效以及长期未用时是否删除。创建一个Binding对象将Exchange和Queue绑定在一起。DirectExchange和Binding两个Bean的配置可以省略掉即如果使用DirectExchange可以只配置一个Queue的实例即可。
再来看看消费者
Component
public class DirectReceiver {RabbitListener(queues hello-queue)public void handler1(String msg) {System.out.println(DirectReceiver: msg);}
}通过 RabbitListener 注解指定一个方法是一个消息消费方法方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送如下
RunWith(SpringRunner.class)
SpringBootTest
public class RabbitmqApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testpublic void directTest() {rabbitTemplate.convertAndSend(hello-queue, hello direct!);}
}最终执行结果如下
3.2. Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上在这种策略中routingkey 将不起任何作用FanoutExchange 配置方式如下
Configuration
public class RabbitFanoutConfig {public final static String FANOUTNAME sang-fanout;BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUTNAME, true, false);}BeanQueue queueOne() {return new Queue(queue-one);}BeanQueue queueTwo() {return new Queue(queue-two);}BeanBinding bindingOne() {return BindingBuilder.bind(queueOne()).to(fanoutExchange());}BeanBinding bindingTwo() {return BindingBuilder.bind(queueTwo()).to(fanoutExchange());}
}在这里首先创建 FanoutExchange参数含义与创建 DirectExchange 参数含义一致然后创建两个 Queue再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者如下
Component
public class FanoutReceiver {RabbitListener(queues queue-one)public void handler1(String message) {System.out.println(FanoutReceiver:handler1: message);}RabbitListener(queues queue-two)public void handler2(String message) {System.out.println(FanoutReceiver:handler2: message);}
}两个消费者分别消费两个消息队列中的消息然后在单元测试中发送消息如下
RunWith(SpringRunner.class)
SpringBootTest
public class RabbitmqApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testpublic void fanoutTest() {rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, hello fanout!);}
}注意这里发送消息时不需要 routingkey指定 exchange 即可routingkey 可以直接传一个 null。
最终执行日志如下
3.3. Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略在 TopicExchange 中Queue 通过 routingkey 绑定到 TopicExchange 上当消息到达 TopicExchange 后TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下
Configuration
public class RabbitTopicConfig {public final static String TOPICNAME sang-topic;BeanTopicExchange topicExchange() {return new TopicExchange(TOPICNAME, true, false);}BeanQueue xiaomi() {return new Queue(xiaomi);}BeanQueue huawei() {return new Queue(huawei);}BeanQueue phone() {return new Queue(phone);}BeanBinding xiaomiBinding() {return BindingBuilder.bind(xiaomi()).to(topicExchange()).with(xiaomi.#);}BeanBinding huaweiBinding() {return BindingBuilder.bind(huawei()).to(topicExchange()).with(huawei.#);}BeanBinding phoneBinding() {return BindingBuilder.bind(phone()).to(topicExchange()).with(#.phone.#);}
}首先创建 TopicExchange参数和前面的一致。然后创建三个 Queue第一个 Queue 用来存储和 “xiaomi” 有关的消息第二个 Queue 用来存储和 “huawei” 有关的消息第三个 Queue 用来存储和 “phone” 有关的消息。将三个 Queue 分别绑定到 TopicExchange 上第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的都将被路由到名称为 “xiaomi” 的 Queue 上第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的都将被路由到名称为 “huawei” 的 Queue 上第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的都将被路由到名称为 “phone” 的 Queue 上。
接下来针对三个 Queue 创建三个消费者如下
Component
public class TopicReceiver {RabbitListener(queues phone)public void handler1(String message) {System.out.println(PhoneReceiver: message);}RabbitListener(queues xiaomi)public void handler2(String message) {System.out.println(XiaoMiReceiver:message);}RabbitListener(queues huawei)public void handler3(String message) {System.out.println(HuaWeiReceiver:message);}
}然后在单元测试中进行消息的发送如下
RunWith(SpringRunner.class)
SpringBootTest
public class RabbitmqApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testpublic void topicTest() {rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
xiaomi.news,小米新闻..);rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
huawei.news,华为新闻..);rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
xiaomi.phone,小米手机..);rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
huawei.phone,华为手机..);rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
phone.news,手机新闻..);}
}根据 RabbitTopicConfig 中的配置第一条消息将被路由到名称为 “xiaomi” 的 Queue 上第二条消息将被路由到名为 “huawei” 的 Queue 上第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上最后一条消息则将被路由到名为 “phone” 的 Queue 上。
3.4. Header
HeadersExchange 是一种使用较少的路由策略HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上这种策略也和 routingkey无关配置如下
Configuration
public class RabbitHeaderConfig {public final static String HEADERNAME javaboy-header;BeanHeadersExchange headersExchange() {return new HeadersExchange(HEADERNAME, true, false);}BeanQueue queueName() {return new Queue(name-queue);}BeanQueue queueAge() {return new Queue(age-queue);}BeanBinding bindingName() {MapString, Object map new HashMap();map.put(name, sang);return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();}BeanBinding bindingAge() {return BindingBuilder.bind(queueAge()).to(headersExchange()).where(age).exists();}
}这里的配置大部分和前面介绍的一样差别主要体现的 Binding 的配置上第一个 bindingName 方法中whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value就把该消息路由到名为 “name-queue” 的 Queue 上这里也可以使用 whereAll 方法表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age不管 age 的值是多少都将消息路由到名为 “age-queue” 的 Queue 上。
接下来创建两个消息消费者
Component
public class HeaderReceiver {RabbitListener(queues name-queue)public void handler1(byte[] msg) {System.out.println(HeaderReceiver:name: new String(msg, 0, msg.length));}RabbitListener(queues age-queue)public void handler2(byte[] msg) {System.out.println(HeaderReceiver:age: new String(msg, 0, msg.length));}
}注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法这里消息的发送也和 routingkey 无关如下
RunWith(SpringRunner.class)
SpringBootTest
public class RabbitmqApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testpublic void headerTest() {Message nameMsg MessageBuilder.withBody(hello header! name-queue.getBytes()).setHeader(name, sang).build();Message ageMsg MessageBuilder.withBody(hello header! age-queue.getBytes()).setHeader(age, 99).build();rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);}
}这里创建两条消息两条消息具有不同的 header不同 header 的消息将被发到不同的 Queue 中去。
最终执行效果如下
4. Routing
这种情况是这样
一个生产者一个交换机两个队列两个消费者生产者在创建 Exchange 后根据 RoutingKey 去绑定相应的队列并且在发送消息时指定消息的具体 RoutingKey 即可。
如下图 这个就是按照 routing key 去路由消息我这里就不再举例子了大家可以参考 3.3.1 小结。
5. Topics
这种情况是这样
一个生产者一个交换机两个队列两个消费者生产者创建 Topic 的 Exchange 并且绑定到队列中这次绑定可以通过 * 和 # 关键字对指定 RoutingKey 内容编写时注意格式 xxx.xxx.xxx 去编写。
如下图 这个我也就不举例啦前面 3.3.3 小节已经举过例子了不再赘述。