郑州网站建设网站,有没有做网站,店铺网页设计尺寸,优质网站建设服务目录 11.4 SpringAMQP
11.4.2 Work Queue工作队列
11.4.3 发布订阅模型
11.4.4 FanoutExchange(广播交换机)
11.4.5 DirectExchange(路由模式交换机)
11.4.6 TopicExchange
11.5 消息转换器 11.4 SpringAMQP 父工程引入AMQP依赖 !--AMQP依赖#xff0c;包含RabbitMQ…目录 11.4 SpringAMQP
11.4.2 Work Queue工作队列
11.4.3 发布订阅模型
11.4.4 FanoutExchange(广播交换机)
11.4.5 DirectExchange(路由模式交换机)
11.4.6 TopicExchange
11.5 消息转换器 11.4 SpringAMQP 父工程引入AMQP依赖 !--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency 编写测试方法
yml配置文件中编写配置 spring:rabbitmq:host: 192.168.142.130 # rabbitmq的ip地址port: 5672 # 端口username: xxxxxpassword: xxxxxxxvirtual-host: / 发消息测试 SpringBootTestpublic class AMQPTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage2SimpleQueue(){String queueName simple.queue;String message hello,spring amqp;rabbitTemplate.convertAndSend(queueName,message);}} 在consumer中编写消费逻辑监听simple.queue
配置文件配置 : spring:rabbitmq:host: 192.168.142.129 # rabbitmq的ip地址port: 5672 # 端口username: xxxxxpassword: xxxxxvirtual-host: /
编写监听类 Componentpublic class SpringRabbitListener {RabbitListener(queues simple.queue)public void ListenSimpleQueue(String msg){System.out.println(消费者接收到simple.queue的消息 : msg);}}
启动主启动类控制台可看到输出的监听到的消息
消息一旦被消费就会从队列中删除没有回收机制 11.4.2 Work Queue工作队列 publisher代码 Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName simple.queue;String message hello,spring amqp__;for(int i 1 ; i 50 ; i ){rabbitTemplate.convertAndSend(queueName,message i); Thread.sleep(20);}} consumer接收消息 // 消费者1RabbitListener(queues simple.queue)public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println(消费者1接收到simple.queue的消息 : msg LocalTime.now());Thread.sleep(20);}// 消费者2RabbitListener(queues simple.queue)public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println(消费者2接收到simple.queue的消息 : msg LocalTime.now());Thread.sleep(200);} 消息预取机制使得两者平均分配消息 不符预期
配置文件中 :
处理预取值 spring:rabbitmq:host: 192.168.142.129 # rabbitmq的ip地址port: 5672 # 端口username: xxxxxxpassword: xxxxxxxvirtual-host: /listener:simple:prefetch: 1 # 每次只能获取一条消息 处理完成才能获取下一个信息 11.4.3 发布订阅模型 11.4.4 FanoutExchange(广播交换机) 步骤一 : 声明交换机队列 并绑定队列和交换机
在consumer中编写配置类 Configurationpublic class FanoutConfig {// 声明交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(xinbo.fanout);}// 声明队列1Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}// 绑定队列1到交换机Beanpublic Binding fanoutBindind(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明队列2Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}// 绑定队列2到交换机Beanpublic Binding fanoutBindind2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}} 消息监听 : Componentpublic class SpringRabbitListener {// 消费者1RabbitListener(queues fanout.queue1)public void ListenWork1Queue(String msg) throws InterruptedException {System.out.println(消费者1接收到fanout.queue1的消息 : msg LocalTime.now());Thread.sleep(20);}// 消费者2RabbitListener(queues fanout.queue2)public void ListenWork2Queue(String msg) throws InterruptedException {System.err.println(消费者2接收到fanout.queue2的消息 : msg LocalTime.now());Thread.sleep(200);}} 消息发送 : Testpublic void testSendFanoutExchange(){String exchangeName xinbo.fanout; // 交换机名称String message hello,everyone;rabbitTemplate.convertAndSend(exchangeName,null,message);} 11.4.5 DirectExchange(路由模式交换机) 利用RabbitListener声明Exchange Queue RoutingKey
SpirngRabbitListener中 Componentpublic class SpringRabbitListener {RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name xinbo.direct,type ExchangeTypes.DIRECT),key {red,blue}))public void ListenDirectQueue1(String msg) throws InterruptedException {System.out.println(消费者接收到direct.queue1的消息 : msg LocalTime.now());Thread.sleep(20);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name xinbo.direct,type ExchangeTypes.DIRECT),key {red,yellow}))public void ListenDirectQueue2(String msg) throws InterruptedException {System.out.println(消费者接收到direct.queue2的消息 : msg LocalTime.now());Thread.sleep(20);}}
发送消息测试 : Testpublic void testSendDirectExchange(){// 交换机名称String exchangeName xinbo.direct;String message hello,blue;rabbitTemplate.convertAndSend(exchangeName,blue,message);} 11.4.6 TopicExchange 绑定队列和交换机的关系 : Componentpublic class SpringRabbitListener {RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(namexinbo.topic,type ExchangeTypes.TOPIC),key china.#))public void ListenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息 : msg LocalTime.now());}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(namexinbo.topic,type ExchangeTypes.TOPIC),key #.news))public void ListenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息 : msg LocalTime.now());}}
发送消息 : Testpublic void testSendTopicExchange(){// 交换机名称String exchangeName xinbo.topic;String message 中国发生了xxxxx;rabbitTemplate.convertAndSend(exchangeName,china.news,message);} 11.5 消息转换器
发送和接受json类型的消息
添加依赖 : dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId/dependency
在配置类中 Beanpublic MessageConverter messageCondition(){return new Jackson2JsonMessageConverter();} 接收消息 :
引依赖 :同上
在Listener中 : RabbitListener(queues object.queue)public void ListenObjectQueue(MapString,Object msg){System.out.println(msg);}