上海电子商务网站制作,公司网站升级改版方案,移动端网站定制,苏州企业网站公司都有哪些MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…
MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注解声明队列有 哪些常用注消息转换器注意同步调用异步调用安装SpringAMQP特征概念
MQMessageQueue消息队列事件驱动架构中的Broker
channel操作MQ的工具exchange路由消息到队列queue缓存消息virtual host 虚拟主机是对queue、exchange等资源逻辑分组
常见消息模型 helloworld案例
角色
publisher消息发布者将消息发送到队列queuequeue消息队列负责接受并缓存消息consumer订阅队列处理队列中的消息
实现
实现spring AMQP发送消息
在父工程引入spring-amqp的依赖 !--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列 在publisher服务中编写application.yml添加mq连接信息 spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor在publisher服务中新建一个测试类编写测试方法 RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue(){String queueName simple.queue;String message hello,spring amqp!;rabbitTemplate.convertAndSend(queueName,message);}
}实现spring AMQP接收消息
在父工程引入spring-amqp的依赖 !--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency在consumer服务中编写消费逻辑监听simple.queue。 在consumer服务中编写application.yml添加mq连接信息 spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor在publisher服务中新建一个测试类编写测试方法 Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println(spring 消费者接收到消息【msg】);}
}
工作消息队列
作用 提高消息处理速度避免队列消息堆积。
实现
在publisher服务中定义测试方法每秒产生50条消息发送到simple.queue Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testWorkQueue() throws InterruptedException {String queueName simple.queue;String message hello,spring amqp!;for (int i 0; i 50; i) {rabbitTemplate.convertAndSend(queueName,messagei);Thread.sleep(20);}}在consumer服务中定义两个消息监听者都监听simple.queue队列 RabbitListener(queues simple.queue)public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println(spring 消费者1接收到消息【msg】);Thread.sleep(20);}RabbitListener(queues simple.queue)public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println(spring 消费者1接收到消息【msg】);//err输出为红色Thread.sleep(200);消费者1每秒处理50条消息消费者2每秒处理10条消息 修改application.yml文件设置preFetch这个值可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1发布订阅模型
概念 与之前模型区别是允许将同一消息发送给多个消费者。 实现方式 exchange交换机 exchange 负责消息路由不存储路由失败则消息丢失 常见exchange类型
Fanout广播Direct路由Topic话题 Fanout Exchange
Fanout Exchange将接收到的消息路由到每一个跟其绑定的queue
实现 在consumer服务中利用代码声明队列Queue、交换机Exchange并将两者绑定Binding SringAMQP提供了声明交换机、队列、绑定关系的API。 在consumer服务常见一个类添加configuration注解并声明FanoutExchange、Queue和绑定关系对象Binding。 Configuration
public class FanoutConfig {//声明FanoutExchange交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(root.fanout);}//声明第一个队列Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}//绑定队列1和交换机Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
//第二个同第一个
}在consumer服务中编写两个消费者方法分别监听fanout.queue1和fanout.queue2 RabbitListener(queues fanout.queue1)public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到消息【msg】);}RabbitListener(queues fanout.queue2)public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到消息【msg】);}在publisher中编写测试方法向root.fanout发送消息 Testpublic void testSendFanoutExchange(){String exchangeName root.fanout;String message hello,spring amqp!;rabbitTemplate.convertAndSend(exchangeName,message);}DirectExchange
DirectExchange: 将接收到的消息更具规则路由到指定的Queue因此称为路由模式routes
每一个Queue都与Exchange设置一个BindingKey发布者发送消息时指定消息的RoutingKeyExchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现
利用RabbitListener声明Exchange、Queue、RoutingKey在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2 RabbitListener(bindings QueueBinding(value Queue(namedirect.queue1),exchange Exchange(name root.direct,type ExchangeTypes.DIRECT),key {red,yellow}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到direct.queue1的消息【msg】);}RabbitListener(bindings QueueBinding(value Queue(namedirect.queue2),exchange Exchange(name root.direct,type ExchangeTypes.DIRECT),key {red,yellow}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到direct.queue2的消息【msg】);}在publisher中编写测试方法向root.direct发送消息 Testpublic void testSendDirectExchange(){String exchangeName root.direct;String message hello,red!;rabbitTemplate.convertAndSend(exchangeName,red,message);}TopicExchange
TopicExchange 与DirectExchange类似区别在于routineKey必须是多个单词的列表并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符 # 代指0个或多个单词 *代指一个单词
实现
利用RabbitListener声明Exchange、Queue、RoutingKey在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue2 RabbitListener(bindings QueueBinding(value Queue(nametopic.queue1),exchange Exchange(name root.topic,type ExchangeTypes.TOPIC),key china.#))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到topic.queue1的消息【msg】);}RabbitListener(bindings QueueBinding(value Queue(nametopic.queue2),exchange Exchange(name root.topic,type ExchangeTypes.TOPIC),key #.news))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println(spring 消费者接收到topic.queue2的消息【msg】);}在publisher中编写测试方法向root.topic发送消息 Testpublic void testSendTopicExchange(){String exchangeName root.topic;String message hello,china.news!;rabbitTemplate.convertAndSend(exchangeName,china.news,message);}DirectExchange 和FanoutExchange的差异
FanoutExchange将消息路由给每一个与之绑定的队列DirectExchange根据RoutingKey判断路由给哪个队列如果多个队列具有相同的RoutingKey则与Fanout功能类似
DirectExchange 和TopicExchange的差异
TopicExchange的routineKey必须使用多个单词以 . 分割TopicExchange可以使用通配符
基于RabbitListener注解声明队列有 哪些常用注
QueueExchange
消息转换器
设置JSON方式序列化 发送消息
在publisher服务引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}接收消息
在consumer服务引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}定义消费者 RabbitListener(queues object.queue)public void listenObjectQueueMessage(MapString,Object msg){System.err.println(spring 消费者1接收到消息【msg】);//err输出为红色}注意
MessageConverter默认是JDK序列化 接收方和发送方必须使用相同的MessageConverter
同步调用
优点 时效性高 问题
耦合度高性能下降资源浪费级联失败
异步调用
实现方式
事件驱动常用
优势
服务解耦性能提升吞吐量提高故障隔离。没有强依赖不担心级联失败问题流量削锋
缺点
依赖Broker的可靠性、安全性、吞吐能力架构复杂、业务没有明显的流程线不好追踪管理
安装
docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USERroot \-e RABBITMQ_DEFAULT_PASStoor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-managementSpringAMQP
AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准该协议与语言平台无关更符合微服务中独立性的要求
Spring AMQP 是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。包含两部分
spring-amqp基础抽象spring-rabbit底层默认实现
特征
监听器容器用于异步处理入站消息用于发送和接收消息的RabbitTemplateRabbitAdmin用于自动声明队列、交换和绑定