做视频大赛推广的网站,wordpress 网站地址,自己做的网站如何发布,免备案域名是什么前言#xff1a;在介绍RabbitMQ之前#xff0c;我们先来看下面一个场景#xff1a;
1.1.1.1 异步处理 场景说明#xff1a; 用户注册后#xff0c;需要发注册邮件和注册短信#xff0c;传统的做法有两种
1.串行的方式 (1)串行方式#xff1a;将注册信息写入数据库后在介绍RabbitMQ之前我们先来看下面一个场景
1.1.1.1 异步处理 场景说明 用户注册后需要发注册邮件和注册短信传统的做法有两种
1.串行的方式 (1)串行方式将注册信息写入数据库后发送注册邮件再发送注册短信以上三个任务全部完成后才返回给客户端。这有一个问题是邮件短信并不是必须的它只是一个通知而这种做法让客户端等待没有必要等待的东西。
(2)并行方式将注册信息写入数据库后发送邮件的同时发送短信以上三个任务完成后返回给客户端并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms串行方式使用时间150ms并行使用时间100ms。虽然并性已经提高的处理时间但是前面说过邮件和短信对我正常的使用网站没有任何影响客户端没有必要等着其发送完成才显示注册成功应该是写入数据库后就返回。 (3)消息队列 引入消息队列后把发送邮件短信不是必须的业务逻辑异步处理。
由此可以看出引入消息队列后用户的响应时间就等于写入数据库的时间写入消息队列的时间(可以忽略不计)引入消息队列后处理后响应时间是串行的3倍是并行的2倍。
1.1.2 应用解耦 场景
双11是购物狂节用户下单后订单系统需要通知库存系统传统的做法就是订单系统调用库存系统的接口。
这种做法有一个缺点:
当库存系统出现故障时订单就会失败。订单系统和库存系统高耦合。
引入消息队列
订单系统用户下单后订单系统完成持久化处理将消息写入消息队列返回用户订单下单成功。
库存系统订阅下单的消息获取下单消息进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
1.1.3 流量削峰 流量削峰一般在秒杀活动中应用广泛
场景:
秒杀活动一般会因为流量过大导致应用挂掉为了解决这个问题一般在应用前端加入消息队列。
作用: 1、可以控制活动人数超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 2、可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1、用户的请求服务器收到之后首先写入消息队列加入消息队列长度超过最大值则直接抛弃用户请求或跳转到错误页面。
2、秒杀业务根据消息队列中的请求信息再做后续处理。
常见MQ产品 ActiveMQ基于JMS
RabbitMQ基于AMQP协议erlang语言开发稳定性好
RocketMQ基于JMS阿里巴巴产品目前交由Apache基金会
Kafka分布式消息系统高吞吐量
一、RabbitMQ快速入门
RabbitMQ是由erlang语言开发基于AMQPAdvanced Message Queue 高级消息队列协议协议实现的消息队列它是一种应用程序之间的通信方法消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址http://www.rabbitmq.com
二、使用步骤
1.pom.xml里导入相关的依赖 !--rabbitmq--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency2.application.yml配置文件 #rabitMq配置rabbitmq:host: 120.25.228.68port: 5672virtual-host: /connection-timeout: 10000#开启Confirm机制publisher-confirm-type: correlated#开启Return机制publisher-returns: true#开启ACKlistener:type: simplesimple:#每次能接收10个消息prefetch: 10acknowledge-mode: manualdirect:acknowledge-mode: manual3.rabbitMq交换机讲解
1、交换机介绍: 先附加下官网文档。RabbitMQ的交换机类型共有四种是根据其路由过程的不同而划分成的
2、交换机模式 一、Direct Exchange直连交换机 直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机并且具备了选择性接收消息的能力。
这种配置下我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色orange绑定键第二个有两个绑定键其中一个绑定键是黑色black另一个绑定键是绿色green。在此设置中发布到交换机的带有橘色orange路由键的消息会被路由给队列Q1。带有黑色black或绿色green路由键的消息会被路由给Q2。其他的消息则会被丢弃。
二、 Fanout Exchange扇型交换机 Fanout Exchange扇型交换机当一个Msg发送到扇形交换机X上时则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列也就是说路由键在扇形交换机里没有作用故消息队列绑定扇形交换机时路由键可为空。这个模式类似于广播。
三、Topic Exchange主题交换机 1路由键和绑定键命名
消息路由键—发送到主题交换机的消息所携带的路由键routing_key不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节词的长度由你来定。 绑定键binding key也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键routing key的消息会被投送给所有绑定键binding key与之相匹配的队列。尽管如此仍然有两条与绑定键相关的特殊情况 (星号) 能够替代一个单词。 #(井号) 能够替代零个或多个单词。
2示例解析如上图 我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词两个点号的路由键routing key。路由键中第一个单词描述速度第二个单词是颜色第三个是品种 “速度.颜色.品种”。我们创建三个绑定Q1通过.orange.“绑定键进行绑定Q2使用”…rabbit 和 “lazy.#”。
队列绑定键解释
Q1针对所有的橘色orange动物。 Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。 消息路由键解释
一个带有quick.orange.rabbit路由键的消息会给两个队列都进行投送。消息lazy.orange.elephant也会投送给这两个队列。 另外一方面“quick.orange.fox” 只会给第一个队列。lazy.pink.rabbit虽然与两个绑定键都匹配但只会给第二个队列投送一遍。“quick.brown.fox” 没有匹配到任何绑定因此会被丢弃掉。 3异常情况 如果我们破坏规则发送的消息只带有一个或者四个单词例如 “orange” 或者 quick.orange.male.rabbit会发生什么呢结果是这些消息不会匹配到任何绑定将会被丢弃。另一方面“lazy.orange.male.rabbit”即使有四个单词也会与最后一个绑定匹配并 被投送到第二个队列。
4注意事项 主题交换机非常强大并且可以表现的跟其他交换机相似。
当一个队列使用#井号绑定键进行绑定。它会表现的像扇形交换机一样不理会路由键接收所有消息。 当绑定当中不包含任何一个 “*” (星号) 和 “#” (井号)特殊字符的时候主题交换机会表现的跟直连交换机一毛一样。
4.在rabbitMq创建好交换机 这里创建了三种模式的交换机分为直连模式(direct)广播模式(fanout)主题模式(topic) 5、队列绑定交换机配置类
/*** author Mr.ZJW* date Created 2023/8/30 16:08* description直连模式交换机配置*/
Configuration
public class DirectRabbitConfig {/*** 队列*/public static final String DIRECT_QUERY directQuery;/*** 交换机*/public static final String DIRECT_EXCHANGE directExchange;/*** 配置队列*/Beanpublic Queue directQuery() {return new Queue(DIRECT_QUERY, true);}/*** 配置交换机*/Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}/*** 绑定交换机*/Beanpublic Binding bindingPushPassengerInfoToDriverExchange(Qualifier(DIRECT_QUERY) Queue queue,Qualifier(DIRECT_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DIRECT_QUERY).noargs();}/*** 乱码配置*/Beanpublic MessageConverter getMessageConverter() {return new Jackson2JsonMessageConverter();}
}
6、消息生产者配置类
/*** Author: Mr.ZJW* Date: 2022-07-12 9:30* Description: MQ消息发送*/
Configuration
public class MqProduct {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息 直连模式** param msg 消息* param routingKey 消息队列Key* return true or false*/public boolean mqDirectSend(String msg, String routingKey) {rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_QUERY, msg, new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});return true;}}
7、消息消费者配置类
/*** Author: Mr.ZJW* Date: 2022-07-08 16:29* Description: MQ消费者*/
Slf4j
Component
public class ProductConsumer {/*** 监听司机信息给乘客** param msg* param message* param channel* throws IOException*/RabbitListener(queues directQuery)public void product(String msg, Message message, Channel channel) throws IOException {try {log.info(receiver success ProductConsumer:{}, msg);log.info(唯一标识:{}, message.getMessageProperties().getCorrelationId());//手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}}}
这里配置类不完整可根据自行调整交换机以及队列可动态配置配置到配置类等。