南京企业制作网站,wordpress默认头像不显示,iis8.5 wordpress,百度做网站骗人到哪里去投诉视频链接#xff1a;【黑马程序员RabbitMQ入门到实战教程】 文章目录 1.初识MQ1.1.同步调用1.2.异步调用1.3.技术选型 2.RabbitMQ2.1.安装2.1.1 Docker2.1.1 Linux2.1.1 Windows 2.2.收发消息2.2.1.交换机2.2.2.队列2.2.3.绑定关系2.2.4.发送消息 2.3.数据隔离2.3.1.用户管理2… 视频链接【黑马程序员RabbitMQ入门到实战教程】 文章目录 1.初识MQ1.1.同步调用1.2.异步调用1.3.技术选型 2.RabbitMQ2.1.安装2.1.1 Docker2.1.1 Linux2.1.1 Windows 2.2.收发消息2.2.1.交换机2.2.2.队列2.2.3.绑定关系2.2.4.发送消息 2.3.数据隔离2.3.1.用户管理2.3.2.virtual host 3.SpringAMQP3.1.导入Demo工程3.2.快速入门3.1.1.消息发送3.1.2.消息接收3.1.3.测试 3.3.WorkQueues模型3.3.1.消息发送3.3.2.消息接收3.3.3.测试3.3.4.能者多劳3.3.5.总结 3.4.交换机类型3.5.Fanout交换机3.5.1.声明队列和交换机3.5.2.消息发送3.5.3.消息接收3.5.4.总结 3.6.Direct交换机3.6.1.声明队列和交换机3.6.2.消息接收3.6.3.消息发送3.6.4.总结 3.7.Topic交换机3.7.1.说明3.7.2.消息发送3.7.3.消息接收3.7.4.总结 3.8.声明队列和交换机3.8.1.基本API3.8.2.fanout示例3.8.2.direct示例3.8.4.基于注解声明 3.9.消息转换器3.9.1.测试默认转换器3.9.2.配置JSON转换器3.9.3.消费者接收Object 4.业务改造4.1.配置MQ4.1.接收消息4.2.发送消息 5.练习5.1.抽取共享的MQ配置5.2.改造下单功能5.3.登录信息传递优化5.4.改造项目一 微服务一旦拆分必然涉及到服务之间的相互调用目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中调用者发起请求后需要
等待服务提供者执行业务返回结果后才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态因此我们成这种调用方式为
同步调用也可以叫
同步通讯。但在很多场景下我们可能需要采用
异步通讯的方式为什么呢 我们先来看看什么是同步通讯和异步通讯。如图 解读
同步通讯就如同打视频电话双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。异步通讯就如同发微信聊天双方的交互不是实时的你不需要立刻给对方回应。因此你可以多线操作同时跟多人聊天。
两种方式各有优劣打电话可以立即得到响应但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信但是往往响应会有延迟。
所以如果我们的业务需要实时得到服务提供方的响应则应该选择同步通讯同步调用。而如果我们追求更高的效率并且不需要实时响应则应该选择异步通讯异步调用。
同步调用的方式我们已经学过了之前的OpenFeign调用就是。但是
异步调用又该如何实现哪些业务适合用异步调用来实现呢
通过今天的学习你就能明白这些问题了。
1.初识MQ
1.1.同步调用
之前说过我们现在基于OpenFeign的调用都属于是同步调用那么这种方式存在哪些问题呢 举个例子我们以昨天留给大家作为作业的余额支付功能为例来分析首先看下整个流程 目前我们采用的是基于OpenFeign的同步调用也就是说业务执行流程是这样的
支付服务需要先调用用户服务完成余额扣减然后支付服务自己要更新支付流水单的状态然后支付服务调用交易服务更新业务订单状态为已支付
三个步骤依次执行。 这其中就存在3个问题 第一拓展性差 我们目前的业务相对简单但是随着业务规模扩大产品的功能也在不断完善。 在大多数电商业务中用户支付成功后都会以短信或者其它方式通知用户告知支付成功。假如后期产品经理提出这样新的需求你怎么办是不是要在上述业务中再加入通知用户的业务 某些电商项目中还会有积分或金币的概念。假如产品经理提出需求用户支付成功后给用户以积分奖励或者返还金币你怎么办是不是要在上述业务中再加入积分业务、返还金币业务 。。。 最终你的支付业务会越来越臃肿 也就是说每次有新的需求现有支付逻辑都要跟着变化代码经常变动不符合开闭原则拓展性不好。
第二性能下降 由于我们采用了同步调用调用者需要等待服务提供者执行完返回结果后才能继续向下执行也就是说每次远程调用调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和 假如每个微服务的执行时长都是50ms则最终整个业务的耗时可能高达300ms性能太差了。
第三级联失败 由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时整个事务都会回滚交易失败。 这其实就是同步调用的级联失败问题。
但是大家思考一下我们假设用户余额充足扣款已经成功此时我们应该确保支付流水单更新为已支付确保交易成功。毕竟收到手里的钱没道理再退回去吧。
因此这里不能因为短信通知、更新订单状态失败而回滚整个事务。
综上同步调用的方式存在下列问题
拓展性差性能下降级联失败
而要解决这些问题我们就必须用异步调用的方式来代替同步调用。
1.2.异步调用
异步调用方式其实就是基于消息通知的方式一般包含三个角色
消息发送者投递消息的人就是原来的调用方消息Broker管理、暂存、转发消息你可以把它理解成微信服务器消息接收者接收和处理消息的人就是原来的服务提供方 在异步调用中发送者不再直接同步调用接收者的业务接口而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后接受者都能获取消息并处理。 这样发送消息的人和接收消息的人就完全解耦了。
还是以余额支付业务为例 除了扣减余额、更新支付流水单状态以外其它调用逻辑全部取消。而是改为发送一条消息到Broker。而相关的微服务都可以订阅消息通知一旦消息到达Broker则会分发给每一个订阅了的微服务处理各自的业务。
假如产品经理提出了新的需求比如要在支付成功后更新用户积分。支付代码完全不用变更而仅仅是让积分服务也订阅消息即可 不管后期增加了多少消息订阅者作为支付服务来讲执行问扣减余额、更新支付流水状态后发送消息即可。业务耗时仅仅是这三部分业务耗时仅仅100ms大大提高了业务性能。
另外不管是交易服务、通知服务还是积分服务他们的业务与支付关联度低。现在采用了异步调用解除了耦合他们即便执行过程中出现了故障也不会影响到支付服务。
综上异步调用的优势包括
耦合度更低性能更好业务拓展性强故障隔离避免级联失败
当然异步通信也并非完美无缺它存在下列缺点
完全依赖于Broker的可靠性、安全性和性能架构复杂后期维护和调试麻烦
1.3.技术选型
消息Broker目前常见的实现方案就是消息队列MessageQueue简称为MQ. 目比较常见的MQ实现
ActiveMQRabbitMQRocketMQKafka
几种常见MQ的对比
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
追求可用性Kafka、 RocketMQ 、RabbitMQ 追求可靠性RabbitMQ、RocketMQ 追求吞吐能力RocketMQ、Kafka 追求消息低延迟RabbitMQ、Kafka
据统计目前国内消息队列使用最多的还是RabbitMQ再加上其各方面都比较均衡稳定性也好因此我们课堂上选择RabbitMQ来学习。
2.RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件官网地址 Messaging that just works — RabbitMQ 接下来我们就学习它的基本概念和基础用法。
2.1.安装
2.1.1 Docker
我们同样基于Docker来安装RabbitMQ使用下面的命令即可
docker run \-e RABBITMQ_DEFAULT_USERitheima \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management如果拉取镜像困难的话可以使用课前资料给大家准备的镜像利用docker load命令加载
可以看到在安装命令中有两个映射的端口
15672RabbitMQ提供的管理控制台的端口5672RabbitMQ的消息发送处理接口
2.1.1 Linux
2.1.1 Windows
安装完成后我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面
RabbitMQ对应的架构如图 其中包含几个概念
**publisher**生产者也就是发送消息的一方**consumer**消费者也就是消费消息的一方**queue**队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理**exchange**交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**virtual host**虚拟主机起到数据隔离的作用。每个虚拟主机相互独立有各自的exchange、queue
上述这些东西都可以在RabbitMQ的管理控制台来管理下一节我们就一起来学习控制台的使用。
2.2.收发消息
2.2.1.交换机
我们打开Exchanges选项卡可以看到已经存在很多交换机 我们点击任意交换机即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息 这里是由控制台模拟了生产者发送的消息。由于没有消费者存在最终消息丢失了这样说明交换机没有存储消息的能力。
2.2.2.队列
我们打开Queues选项卡新建一个队列 命名为hello.queue1 再以相同的方式创建一个队列密码为hello.queue2最终队列列表如下 此时我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列 怎么回事呢 发送到交换机的消息只会路由到与其绑定的队列因此仅仅创建队列是不够的我们还需要将其与交换机绑定。
2.2.3.绑定关系
点击Exchanges选项卡点击amq.fanout交换机进入交换机详情页然后点击Bindings菜单在表单中填写要绑定的队列名称 相同的方式将hello.queue2也绑定到改交换机。 最终绑定结果如下
2.2.4.发送消息
再次回到exchange页面找到刚刚绑定的amq.fanout点击进入详情页再次发送一条消息 回到Queues页面可以发现hello.queue中已经有一条消息了 点击队列名称进入详情页查看队列详情这次我们点击get message 可以看到消息到达队列了 这个时候如果有消费者监听了MQ的hello.queue1或hello.queue2队列自然就能接收到消息了。
2.3.数据隔离
2.3.1.用户管理
点击Admin选项卡首先会看到RabbitMQ控制台的用户管理界面 这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段如下
Nameitheima也就是用户名Tagsadministrator说明itheima用户是超级管理员拥有所有权限Can access virtual host /可以访问的virtual host这里的/是默认的virtual host
对于小型企业而言出于成本考虑我们通常只会搭建一套MQ集群公司内的多个不同项目同时使用。这个时候为了避免互相干扰 我们会利用virtual host的隔离特性将不同项目隔离。一般会做两件事情
给每个项目创建独立的运维账号将管理权限分离。给每个项目创建不同的virtual host将每个项目的数据隔离。
比如我们给黑马商城创建一个新的用户命名为hmall 你会发现此时hmall用户没有任何virtual host的访问权限 别急接下来我们就来授权。
2.3.2.virtual host
我们先退出登录 切换到刚刚创建的hmall用户登录然后点击Virtual Hosts菜单进入virtual host管理页 可以看到目前只有一个默认的virtual host名字为 /。 我们可以给黑马商城项目创建一个单独的virtual host而不是使用默认的/。 创建完成后如图 由于我们是登录hmall账户后创建的virtual host因此回到users菜单你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了
此时点击页面右上角的virtual host下拉菜单切换virtual host为 /hmall 然后再次查看queues选项卡会发现之前的队列已经看不到了 这就是基于virtual host 的隔离效果。
3.SpringAMQP
将来我们开发业务功能的时候肯定不会在控制台收发消息而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配使用起来非常方便。
SpringAmqp的官方地址 Spring AMQP SpringAMQP提供了三个功能
自动声明队列、交换机及其绑定关系基于注解的监听器模式异步接收消息封装了RabbitTemplate工具用于发送消息
这一章我们就一起学习一下如何利用SpringAMQP实现对RabbitMQ的消息收发。
3.1.导入Demo工程
在课前资料给大家提供了一个Demo工程方便我们学习SpringAMQP的使用 将其复制到你的工作空间然后用Idea打开项目结构如图 包括三部分
mq-demo父工程管理项目依赖publisher消息的发送者consumer消息的消费者
在mq-demo这个父工程中已经配置好了SpringAMQP相关的依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.itcast.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.12/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies
/project因此子工程中就可以直接使用SpringAMQP了。
3.2.快速入门
在之前的案例中我们都是经过交换机发送消息到队列不过有时候为了测试方便我们也可以直接向队列发送消息跳过交换机。 在入门案例中我们就演示这样的简单模型如图 也就是
publisher直接发送消息到队列消费者监听并处理队列中的消息
:::warning 注意这种模式一般测试使用很少在生产中使用。 :::
为了方便测试我们现在控制台新建一个队列simple.queue 添加成功 接下来我们就可以利用Java代码收发消息了。
3.1.1.消息发送
首先配置MQ地址在publisher服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码然后在publisher服务中编写测试类SpringAmqpTest并利用RabbitTemplate实现消息发送
package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue() {// 队列名称String queueName simple.queue;// 消息String message hello, spring amqp!;// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}打开控制台可以看到消息已经发送到队列中 接下来我们再来实现消息接收。
3.1.2.消息接收
首先配置MQ地址在consumer服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener代码如下
package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息就会推送给当前服务调用当前方法处理消息。// 可以看到方法体中接收的就是消息体的内容RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println(spring 消费者接收到消息【 msg 】);}
}3.1.3.测试
启动consumer服务然后在publisher服务中运行测试代码发送MQ消息。最终consumer收到消息
3.3.WorkQueues模型
Work queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息。
当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。
接下来我们就来模拟这样的场景。 首先我们在控制台创建一个新的队列命名为work.queue
3.3.1.消息发送
这次我们循环发送模拟大量消息堆积现象。 在publisher服务中的SpringAmqpTest类中添加一个测试方法
/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}3.3.2.消息接收
要模拟多个消费者绑定同一个队列我们在consumer服务的SpringRabbitListener中添加2个新的方法
RabbitListener(queues work.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues work.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}注意到这两消费者都设置了Thead.sleep模拟任务耗时
消费者1 sleep了20毫秒相当于每秒钟处理50个消息消费者2 sleep了200毫秒相当于每秒处理5个消息
3.3.3.测试
启动ConsumerApplication后在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。 最终结果如下
消费者1接收到消息【hello, message_0】21:06:00.869555300
消费者2........接收到消息【hello, message_1】21:06:00.884518
消费者1接收到消息【hello, message_2】21:06:00.907454400
消费者1接收到消息【hello, message_4】21:06:00.953332100
消费者1接收到消息【hello, message_6】21:06:00.997867300
消费者1接收到消息【hello, message_8】21:06:01.042178700
消费者2........接收到消息【hello, message_3】21:06:01.086478800
消费者1接收到消息【hello, message_10】21:06:01.087476600
消费者1接收到消息【hello, message_12】21:06:01.132578300
消费者1接收到消息【hello, message_14】21:06:01.175851200
消费者1接收到消息【hello, message_16】21:06:01.218533400
消费者1接收到消息【hello, message_18】21:06:01.261322900
消费者2........接收到消息【hello, message_5】21:06:01.287003700
消费者1接收到消息【hello, message_20】21:06:01.304412400
消费者1接收到消息【hello, message_22】21:06:01.349950100
消费者1接收到消息【hello, message_24】21:06:01.394533900
消费者1接收到消息【hello, message_26】21:06:01.439876500
消费者1接收到消息【hello, message_28】21:06:01.482937800
消费者2........接收到消息【hello, message_7】21:06:01.488977100
消费者1接收到消息【hello, message_30】21:06:01.526409300
消费者1接收到消息【hello, message_32】21:06:01.572148
消费者1接收到消息【hello, message_34】21:06:01.618264800
消费者1接收到消息【hello, message_36】21:06:01.660780600
消费者2........接收到消息【hello, message_9】21:06:01.689189300
消费者1接收到消息【hello, message_38】21:06:01.705261
消费者1接收到消息【hello, message_40】21:06:01.746927300
消费者1接收到消息【hello, message_42】21:06:01.789835
消费者1接收到消息【hello, message_44】21:06:01.834393100
消费者1接收到消息【hello, message_46】21:06:01.875312100
消费者2........接收到消息【hello, message_11】21:06:01.889969500
消费者1接收到消息【hello, message_48】21:06:01.920702500
消费者2........接收到消息【hello, message_13】21:06:02.090725900
消费者2........接收到消息【hello, message_15】21:06:02.293060600
消费者2........接收到消息【hello, message_17】21:06:02.493748
消费者2........接收到消息【hello, message_19】21:06:02.696635100
消费者2........接收到消息【hello, message_21】21:06:02.896809700
消费者2........接收到消息【hello, message_23】21:06:03.099533400
消费者2........接收到消息【hello, message_25】21:06:03.301446400
消费者2........接收到消息【hello, message_27】21:06:03.504999100
消费者2........接收到消息【hello, message_29】21:06:03.705702500
消费者2........接收到消息【hello, message_31】21:06:03.906601200
消费者2........接收到消息【hello, message_33】21:06:04.108118500
消费者2........接收到消息【hello, message_35】21:06:04.308945400
消费者2........接收到消息【hello, message_37】21:06:04.511547700
消费者2........接收到消息【hello, message_39】21:06:04.714038400
消费者2........接收到消息【hello, message_41】21:06:04.916192700
消费者2........接收到消息【hello, message_43】21:06:05.116286400
消费者2........接收到消息【hello, message_45】21:06:05.318055100
消费者2........接收到消息【hello, message_47】21:06:05.520656400
消费者2........接收到消息【hello, message_49】21:06:05.723106700
可以看到消费者1和消费者2竟然每人消费了25条消息
消费者1很快完成了自己的25条消息消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者并没有考虑到消费者的处理能力。导致1个消费者空闲另一个消费者忙的不可开交。没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
3.3.4.能者多劳
在spring中有一个简单的配置可以解决这个问题。我们修改consumer服务的application.yml文件添加配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息再次测试发现结果如下
消费者1接收到消息【hello, message_0】21:12:51.659664200
消费者2........接收到消息【hello, message_1】21:12:51.680610
消费者1接收到消息【hello, message_2】21:12:51.703625
消费者1接收到消息【hello, message_3】21:12:51.724330100
消费者1接收到消息【hello, message_4】21:12:51.746651100
消费者1接收到消息【hello, message_5】21:12:51.768401400
消费者1接收到消息【hello, message_6】21:12:51.790511400
消费者1接收到消息【hello, message_7】21:12:51.812559800
消费者1接收到消息【hello, message_8】21:12:51.834500600
消费者1接收到消息【hello, message_9】21:12:51.857438800
消费者1接收到消息【hello, message_10】21:12:51.880379600
消费者2........接收到消息【hello, message_11】21:12:51.899327100
消费者1接收到消息【hello, message_12】21:12:51.922828400
消费者1接收到消息【hello, message_13】21:12:51.945617400
消费者1接收到消息【hello, message_14】21:12:51.968942500
消费者1接收到消息【hello, message_15】21:12:51.992215400
消费者1接收到消息【hello, message_16】21:12:52.013325600
消费者1接收到消息【hello, message_17】21:12:52.035687100
消费者1接收到消息【hello, message_18】21:12:52.058188
消费者1接收到消息【hello, message_19】21:12:52.081208400
消费者2........接收到消息【hello, message_20】21:12:52.103406200
消费者1接收到消息【hello, message_21】21:12:52.123827300
消费者1接收到消息【hello, message_22】21:12:52.146165100
消费者1接收到消息【hello, message_23】21:12:52.168828300
消费者1接收到消息【hello, message_24】21:12:52.191769500
消费者1接收到消息【hello, message_25】21:12:52.214839100
消费者1接收到消息【hello, message_26】21:12:52.238998700
消费者1接收到消息【hello, message_27】21:12:52.259772600
消费者1接收到消息【hello, message_28】21:12:52.284131800
消费者2........接收到消息【hello, message_29】21:12:52.306190600
消费者1接收到消息【hello, message_30】21:12:52.325315800
消费者1接收到消息【hello, message_31】21:12:52.347012500
消费者1接收到消息【hello, message_32】21:12:52.368508600
消费者1接收到消息【hello, message_33】21:12:52.391785100
消费者1接收到消息【hello, message_34】21:12:52.416383800
消费者1接收到消息【hello, message_35】21:12:52.439019
消费者1接收到消息【hello, message_36】21:12:52.461733900
消费者1接收到消息【hello, message_37】21:12:52.485990
消费者1接收到消息【hello, message_38】21:12:52.509219900
消费者2........接收到消息【hello, message_39】21:12:52.523683400
消费者1接收到消息【hello, message_40】21:12:52.547412100
消费者1接收到消息【hello, message_41】21:12:52.571191800
消费者1接收到消息【hello, message_42】21:12:52.593024600
消费者1接收到消息【hello, message_43】21:12:52.616731800
消费者1接收到消息【hello, message_44】21:12:52.640317
消费者1接收到消息【hello, message_45】21:12:52.663111100
消费者1接收到消息【hello, message_46】21:12:52.686727
消费者1接收到消息【hello, message_47】21:12:52.709266500
消费者2........接收到消息【hello, message_48】21:12:52.725884900
消费者1接收到消息【hello, message_49】21:12:52.746299900
可以发现由于消费者1处理速度较快所以处理了更多的消息消费者2处理速度较慢只处理了6条消息。而最终总的执行耗时也在1秒左右大大提升。 正所谓能者多劳这样充分利用了每一个消费者的处理能力可以有效避免消息积压问题。
3.3.5.总结
Work模型的使用
多个消费者绑定到一个队列同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量
3.4.交换机类型
在之前的两个测试案例中都没有交换机生产者直接发送消息到队列。而一旦引入交换机消息发送的模式会有很大变化 可以看到在订阅模型中多了一个exchange角色而且过程略有变化
Publisher生产者不再发送消息到队列中而是发给交换机Exchange交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Queue消息队列也与以前一样接收消息、缓存消息。不过队列一定要与交换机绑定。Consumer消费者与以前一样订阅队列没有变化
Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
交换机的类型有四种
Fanout广播将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机Direct订阅基于RoutingKey路由key发送给订阅了消息的队列Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符Headers头匹配基于MQ的消息头匹配用的较少。
课堂中我们讲解前面的三种交换机模式。
3.5.Fanout交换机
Fanout英文翻译是扇出我觉得在MQ中叫广播更合适。 在广播模式下消息发送流程是这样的
1 可以有多个队列2 每个队列都要绑定到Exchange交换机3 生产者发送的消息只能发送到交换机4 交换机把消息发送给绑定过的所有队列5 订阅队列的消费者都能拿到消息
我们的计划是这样的
创建一个名为 hmall.fanout的交换机类型是Fanout创建两个队列fanout.queue1和fanout.queue2绑定到交换机hmall.fanout
3.5.1.声明队列和交换机
在控制台创建队列fanout.queue1: 在创建一个队列fanout.queue2 然后再创建一个交换机 然后绑定两个队列到交换机
3.5.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
Test
public void testFanoutExchange() {// 交换机名称String exchangeName hmall.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}3.5.3.消息接收
在consumer服务的SpringRabbitListener中添加两个方法作为消费者
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}3.5.4.总结
交换机的作用是什么
接收publisher发送的消息将消息按照规则路由到与之绑定的队列不能缓存消息路由失败消息丢失FanoutExchange的会将消息路由到每个绑定的队列
3.6.Direct交换机
在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
案例需求如图
声明一个名为hmall.direct的交换机声明队列direct.queue1绑定hmall.directbindingKey为blud和red声明队列direct.queue2绑定hmall.directbindingKey为yellow和red在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2在publisher中编写测试方法向hmall.direct发送消息
3.6.1.声明队列和交换机
首先在控制台声明两个队列direct.queue1和direct.queue2这里不再展示过程 然后声明一个direct类型的交换机命名为hmall.direct: 然后使用red和blue作为key绑定direct.queue1到hmall.direct
同理使用red和yellow作为key绑定direct.queue2到hmall.direct步骤略最终结果
3.6.2.消息接收
在consumer服务的SpringRabbitListener中添加方法
RabbitListener(queues direct.queue1)
public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(queues direct.queue2)
public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}3.6.3.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
Test
public void testSendDirectExchange() {// 交换机名称String exchangeName hmall.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);
}由于使用的red这个key所以两个消费者都收到了消息 我们再切换为blue这个key
Test
public void testSendDirectExchange() {// 交换机名称String exchangeName hmall.direct;// 消息String message 最新报道哥斯拉是居民自治巨型气球虚惊一场;// 发送消息rabbitTemplate.convertAndSend(exchangeName, blue, message);
}你会发现只有消费者1收到了消息
3.6.4.总结
描述下Direct交换机与Fanout交换机的差异
Fanout交换机将消息路由给每一个与之绑定的队列Direct交换机根据RoutingKey判断路由给哪个队列如果多个队列具有相同的RoutingKey则与Fanout功能类似
3.7.Topic交换机
3.7.1.说明
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符
BindingKey 一般都是有一个或多个单词组成多个单词之间以.分割例如 item.insert
通配符规则
#匹配一个或多个词*匹配不多不少恰好1个词
举例
item.#能够匹配item.spu.insert 或者 item.spuitem.*只能匹配item.spu
图示 假如此时publisher发送的消息使用的RoutingKey共有四种
china.news 代表有中国的新闻消息china.weather 代表中国的天气消息japan.news 则代表日本新闻japan.weather 代表日本的天气消息
解释
topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括 china.newschina.weather topic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括: china.newsjapan.news
接下来我们就按照上图所示来演示一下Topic交换机的用法。 首先在控制台按照图示例子创建队列、交换机并利用通配符绑定队列和交换机。此处步骤略。最终结果如下
3.7.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName hmall.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);
}3.7.3.消息接收
在consumer服务的SpringRabbitListener中添加方法
RabbitListener(queues topic.queue1)
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(queues topic.queue2)
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}3.7.4.总结
描述下Direct交换机与Topic交换机的差异
Topic交换机接收的消息RoutingKey必须是多个单词以 **.** 分割Topic交换机与队列绑定时的bindingKey可以指定通配符#代表0个或多个词*代表1个词
3.8.声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时队列和交换机是程序员定义的将来项目上线又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来交给运维。在这个过程中是很容易出现错误的。 因此推荐的做法是由程序启动时检查队列和交换机是否存在如果不存在自动创建。
3.8.1.基本API
SpringAMQP提供了一个Queue类用来创建队列
SpringAMQP还提供了一个Exchange接口来表示所有不同类型的交换机 我们可以自己创建队列和交换机不过SpringAMQP还提供了ExchangeBuilder来简化这个过程 而在绑定队列和交换机时则需要使用BindingBuilder来创建Binding对象
3.8.2.fanout示例
在consumer中创建一个类声明队列和交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(hmall.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}3.8.2.direct示例
direct模式由于要绑定多个KEY会非常麻烦每一个Key都要编写一个binding
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DirectConfig {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(hmall.direct).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}
3.8.4.基于注解声明
基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。
例如我们同样声明Direct模式的交换机和队列
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, blue}
))
public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}是不是简单多了。 再试试Topic模式
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}3.9.消息转换器
Spring的消息发送代码接收的消息体是一个Object 而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题
数据体积过大有安全漏洞可读性差
我们来测试一下。
3.9.1.测试默认转换器
1创建测试队列 首先我们在consumer服务中声明一个新的配置类 利用Bean的方式创建一个队列具体代码
package com.itheima.consumer.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MessageConfig {Beanpublic Queue objectQueue() {return new Queue(object.queue);}
}注意这里我们先不要给这个队列添加消费者我们要查看消息体的格式。
重启consumer服务以后该队列就会被自动创建出来了
2发送消息 我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码发送一个Map对象
Test
public void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, 柳岩);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(object.queue, msg);
}发送消息后查看控制台 可以看到消息格式非常不友好。
3.9.2.配置JSON转换器
显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。
配置消息转换器在publisher和consumer两个服务的启动类中添加一个Bean即可
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}消息转换器中添加的messageId可以便于我们将来做幂等性判断。
此时我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码到MQ的控制台查看消息结构
3.9.3.消费者接收Object
我们在consumer服务中定义一个新的消费者publisher是用Map发送那么消费者也一定要用Map接收格式如下
RabbitListener(queues object.queue)
public void listenSimpleQueueMessage(MapString, Object msg) throws InterruptedException {System.out.println(消费者接收到object.queue消息【 msg 】);
}4.业务改造
案例需求改造余额支付功能将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用改为基于RabbitMQ的异步通知。 如图 说明我们只关注交易服务步骤如下
定义topic类型交换机命名为pay.topic定义消息队列命名为mark.order.pay.queue将mark.order.pay.queue与pay.topic绑定BindingKey为pay.success支付成功时不再调用交易服务更新订单状态的接口而是发送一条消息到pay.topic发送消息的RoutingKey 为pay.success消息内容是订单id交易服务监听mark.order.pay.queue队列接收到消息后更新订单状态为已支付
4.1.配置MQ
不管是生产者还是消费者都需要配置MQ的基本信息。分为两步 1添加依赖 !--消息发送--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2配置MQ地址
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码4.1.接收消息
在trade-service服务中定义一个消息监听类 其代码如下
package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;RabbitListener(bindings QueueBinding(value Queue(name mark.order.pay.queue, durable true),exchange Exchange(name pay.topic, type ExchangeTypes.TOPIC),key pay.success))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}
}4.2.发送消息
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法
private final RabbitTemplate rabbitTemplate;Override
Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付状态异常throw new BizIllegalException(交易已支付或关闭);}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException(交易已支付或关闭);}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend(pay.topic, pay.success, po.getBizOrderNo());} catch (Exception e) {log.error(支付成功的消息发送失败支付单id{} 交易单id{}, po.getId(), po.getBizOrderNo(), e);}
}5.练习
5.1.抽取共享的MQ配置
将MQ配置抽取到Nacos中管理微服务中直接使用共享配置。
5.2.改造下单功能
改造下单功能将基于OpenFeign的清理购物车同步调用改为基于RabbitMQ的异步通知
定义topic类型交换机命名为trade.topic定义消息队列命名为cart.clear.queue将cart.clear.queue与trade.topic绑定BindingKey为order.create下单成功时不再调用清理购物车接口而是发送一条消息到trade.topic发送消息的RoutingKey 为order.create消息内容是下单的具体商品、当前登录用户信息购物车服务监听cart.clear.queue队列接收到消息后清理指定用户的购物车中的指定商品
5.3.登录信息传递优化
某些业务中需要根据登录用户信息处理业务而基于MQ的异步调用并不会传递登录用户信息。前面我们的做法比较麻烦至少要做两件事
消息发送者在消息体中传递登录用户消费者获取消息体中的登录用户处理业务
这样做不仅麻烦而且编程体验也不统一毕竟我们之前都是使用UserContext来获取用户。
大家思考一下有没有更优雅的办法传输登录用户信息让使用MQ的人无感知依然采用UserContext来随时获取用户。
参考资料 Spring AMQP
5.4.改造项目一
思考一下项目一中的哪些业务可以由同步方式改为异步方式调用试着改造一下。 举例短信发送