当前位置: 首页 > news >正文

淮阴区住房和城乡建设局网站中国核工业第五建设有限公司招聘

淮阴区住房和城乡建设局网站,中国核工业第五建设有限公司招聘,企业工商信息查询网官网,搜索引擎优化基本目录 1.SpringAMQP 1.1.导入demo工程 1.2.快速入门 1.2.1.消息发送 1.2.2.消息接收 1.2.3.测试 1.3.WorkQueues模型 1.4.交换机类型 1.4.1.Fanout交换机 1.4.2.Direct交换机 1.4.3.Topic交换机 1.5.声明队列和交换机 1.5.1.基于注解声明 1.6.消息转换器 1.6.1.测…目录 1.SpringAMQP 1.1.导入demo工程 1.2.快速入门 1.2.1.消息发送 1.2.2.消息接收 1.2.3.测试 1.3.WorkQueues模型 1.4.交换机类型 1.4.1.Fanout交换机 1.4.2.Direct交换机 1.4.3.Topic交换机 1.5.声明队列和交换机 1.5.1.基于注解声明 1.6.消息转换器 1.6.1.测试默认转换器 1.6.2.配置JSON转换器 1.6.3.消费者接收Object 1.SpringAMQP 将来我们开发业务功能的时候肯定不会在控制台收发消息而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配使用起来非常方便。 SpringAMQP官方网址Spring AMQP SpringAMQP提供了三个功能- 自动声明队列、交换机及其绑定关系 - 基于注解的监听器模式异步接收消息 - 封装了RabbitTemplate工具用于发送消息 1.1.导入demo工程 为方便我们学习SpringAMQP的使用导入demo工程 然后用Idea打开项目结构如图 包括三部分 - mq-demo父工程管理项目依赖 - publisher消息的发送者 - consumer消息的消费者 在该工程中已经配置好了SpringAMQP相关的依赖主要是AMQP的依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.itcast.demo/groupIdartifactIdmq-demo/artifactIdversion1.0-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulespackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.12/versionrelativePath//parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--单元测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies /project 现在可以直接使用SpringAMQP了。 1.2.快速入门 在之前的案例中我们都是经过交换机发送消息到队列不过有时候为了测试方便我们也可以直接向队列发送消息跳过交换机。 在入门案例中我们就演示这样的简单模型如图 也就是 - publisher直接发送消息到队列 - 消费者监听并处理队列中的消息 注意这种模式一般测试使用很少在生产中使用。 为了方便测试我们现在控制台新建一个队列simple.queue 添加成功 接下来我们就可以利用Java代码收发消息了。   1.2.1.消息发送 首先配置MQ地址在publisher服务的application.yml中添加配置 spring:rabbitmq:host: 192.168.52.135 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机 之前已经创建的username: hmall # 用户名password: 123 # 密码 然后在publisher服务中编写测试类SpringAmqpTest并利用RabbitTemplate实现消息发送核心代码 打开控制台可以看到消息已经发送到队列中 接下来我们再来实现消息接收。   1.2.2.消息接收 首先配置MQ地址在consumer服务的application.yml中添加配置 spring:rabbitmq:host: 192.168.52.135 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码 然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener核心代码如下 Component public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息就会推送给当前服务调用当前方法处理消息。// 可以看到方法体中接收的就是消息体的内容RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println(spring 消费者接收到消息【 msg 】);} } 1.2.3.测试 启动consumer服务然后在publisher服务中运行测试代码发送MQ消息。最终consumer收到消息 1.3.WorkQueues模型 Work queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息。 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。 接下来我们就来模拟这样的场景。 首先我们在控制台创建一个新的队列命名为work.queue 1消息发送 这次我们循环发送模拟大量消息堆积现象。 在publisher服务中的SpringAmqpTest类中添加一个测试方法 2消息接收 要模拟多个消费者绑定同一个队列我们在consumer服务的SpringRabbitListener中添加2个新的方法 注意到这两消费者都设置了Thead.sleep模拟任务耗时 - 消费者1 sleep了20毫秒相当于每秒钟处理50个消息 - 消费者2 sleep了200毫秒相当于每秒处理5个消息 3测试 启动ConsumerApplication后在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。最终结果如下 可以看到消费者1和消费者2每人都消费了25条消息- 消费者1很快完成了自己的25条消息 - 消费者2却在缓慢的处理自己的25条消息。 也就是说消息是平均分配给每个消费者轮询并没有考虑到消费者的处理能力。导致1个消费者空闲另一个消费者忙的不可开交。没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 为了解决这样的问题有以下配置 4能者多劳 在spring中有一个简单的配置可以解决这个问题。我们修改consumer服务的application.yml文件添加配置 spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息 再次测试可以发现由于消费者1处理速度较快所以处理了更多的消息消费者2处理速度较慢只处理了6条消息。而最终总的执行耗时也在1秒左右大大提升。 正所谓能者多劳这样充分利用了每一个消费者的处理能力可以有效避免消息积压问题。 5总结 Work模型的使用 多个消费者绑定到一个队列加快处理速度同一条消息只会被一个消费者处理 通过设置prefetch来控制消费者预取的消息数量处理完一条再拿下一条能者多劳。 1.4.交换机类型 在之前的两个测试中都没有交换机生产者直接发送消息到队列。而一旦引入交换机消息发送的模式会有很大变化 可以看到在订阅模型中多了一个exchange角色而且过程略有变化- Publisher生产者不再发送消息到队列中而是发给交换机- Exchange交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。- Queue消息队列也与以前一样接收消息、缓存消息。不过队列一定要与交换机绑定。- Consumer消费者与以前一样订阅队列没有变化 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 交换机的类型有四种 Fanout广播将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机 Direct订阅基于RoutingKey路由key发送给订阅了消息的队列 Topic通配符订阅与Direct类似只不过RoutingKey可以使用通配符 Headers头匹配基于MQ的消息头匹配用的较少。 1.4.1.Fanout交换机 在广播模式下消息发送流程是这样的 - 1  可以有多个队列 - 2  每个队列都要绑定到Exchange交换机 - 3  生产者发送的消息只能发送到交换机 - 4  交换机把消息发送给绑定过的所有队列 - 5  订阅队列的消费者都能拿到消息 接下来通过一个案例入门 1声明队列和交换机 在控制台创建队列fanout.queue1再以同样的方法创建队列fanoutqueue2: 然后再创建一个交换机 然后绑定队列fanout.queue1到交换机再以同样的方式绑定fanout.queue2到交换机 2消息发送 在publisher服务的SpringAmqpTest类中添加测试方法 Test public void testFanoutExchange() {// 交换机名称String exchangeName hmall.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);//与之前的测试不同的是这个方法需要有三个参数。//三个参数中间的参数可以为空。 } 3消息接收 在consumer服务的SpringRabbitListener中添加两个方法作为消费者 RabbitListener(queues fanout.queue1) public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】); }RabbitListener(queues fanout.queue2) public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】); } 4总结 交换机的作用是什么 1接收publisher发送的消息 2将消息按照规则路由到与之绑定的队列 3不能缓存消息路由失败消息丢失 4FanoutExchange的会将消息路由到每个绑定的队列 1.4.2.Direct交换机 在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 基本流程如下 在Direct模型下- 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key - 消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。 - Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息 接下来通过一个案例入门 1声明队列和交换机 首先在控制台声明两个队列direct.queue1和direct.queue2这里不再展示过程 然后声明一个direct类型的交换机命名为hmall.direct: 然后使用red和blue作为key绑定direct.queue1到hmall.direct 同理使用red和yellow作为key绑定direct.queue2到hmall.direct步骤略最终结果 2消息接收 在consumer服务的SpringRabbitListener中添加方法 RabbitListener(queues direct.queue1) public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】); }RabbitListener(queues direct.queue2) public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】); } 3消息发送 在publisher服务的SpringAmqpTest类中添加测试方法 Test public void testSendDirectExchange() {// 交换机名称String exchangeName hmall.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message); } 由于使用的red这个key所以两个消费者都收到了消息 我们再切换为blue这个key Test public void testSendDirectExchange() {// 交换机名称String exchangeName hmall.direct;// 消息String message 最新报道哥斯拉是居民自治巨型气球虚惊一场;// 发送消息rabbitTemplate.convertAndSend(exchangeName, blue, message); } 你会发现只有消费者1收到了消息 4总结 描述下Direct交换机与Fanout交换机的差异 - Fanout交换机将消息路由给每一个与之绑定的队列 - Direct交换机根据RoutingKey判断路由给哪个队列 - 如果多个队列具有相同的RoutingKey则与Fanout功能类似 1.4.3.Topic交换机 Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符 BindingKey 一般都是有一个或多个单词组成多个单词之间以.分割例如 item.insert 通配符规则 - #匹配一个或多个词 - *匹配不多不少恰好1个词 举例 - item.#能够匹配item.spu.insert 或者 item.spu - item.*只能匹配item.spu 图示 假如此时publisher发送的消息使用的RoutingKey共有四种- china.news 代表有中国的新闻消息- china.weather 代表中国的天气消息- japan.news 则代表日本新闻- japan.weather 代表日本的天气消息 解释 - topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括   - china.news   - china.weather - topic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括:   - china.news   - japan.news 接下来我们就按照上图所示来演示一下Topic交换机的用法。 首先在控制台按照图示例子创建队列、交换机并利用通配符绑定队列和交换机。此处步骤略。最终结果如下 1消息发送 在publisher服务的SpringAmqpTest类中添加测试方法 /*** topicExchange*/ Test public void testSendTopicExchange() {// 交换机名称String exchangeName hmall.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message); } 2消息接收 在consumer服务的SpringRabbitListener中添加方法 RabbitListener(queues topic.queue1) public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】); }RabbitListener(queues topic.queue2) public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】); } 3总结 描述下Direct交换机与Topic交换机的差异 1Topic交换机接收的消息RoutingKey必须是多个单词以 . 分割 2Topic交换机与队列绑定时的bindingKey可以指定通配符 3#代表0个或多个词 4*代表1个词 1.5.声明队列和交换机 在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时队列和交换机是程序员定义的将来项目上线又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来交给运维。在这个过程中是很容易出现错误的。 因此推荐的做法是由程序启动时检查队列和交换机是否存在如果不存在自动创建。 1.5.1.基于注解声明 基于Bean的方式声明队列和交换机比较麻烦这里我们只讲解基于注解方式来声明。 注意声明的文件是Listener下的在监听者位置声明 我们同样声明Direct模式的交换机和队列 RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, blue} )) public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】); }RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, yellow} )) public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】); } 再试试Topic模式 RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key china.# )) public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】); }RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key #.news )) public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】); } 非常简便 1.6.消息转换器 Spring的消息发送代码接收的消息体是一个Object 而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题- 数据体积过大 - 有安全漏洞 - 可读性差 我们来测试一下。 1.6.1.测试默认转换器 1创建测试队列 首先我们在consumer服务中声明一个新的配置类 利用Bean的方式创建一个队列 具体代码 注意这里我们先不要给这个队列添加消费者我们要查看消息体的格式。 重启consumer服务以后该队列就会被自动创建出来了 2发送消息 我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码发送一个Map对象 发送消息后查看控制台 可以看到消息格式非常不友好。 1.6.2.配置JSON转换器 显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。 在publisher和consumer两个服务中都引入依赖 dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version /dependency 注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。 配置消息转换器在publisher和consumer两个服务的启动类中添加一个Bean即可 Bean public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter; } 消息转换器中添加的messageId可以便于我们将来做幂等性判断。 此时我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码到MQ的控制台查看消息结构 1.6.3.消费者接收Object 我们在consumer服务中定义一个新的消费者publisher是用Map发送那么消费者也一定要用Map接收格式如下 RabbitListener(queues object.queue) public void listenSimpleQueueMessage(MapString, Object msg) throws InterruptedException {System.out.println(消费者接收到object.queue消息【 msg 】); }
http://www.w-s-a.com/news/920088/

相关文章:

  • 怎么用自己电脑做服务器发布网站吗seo门户网价格是多少钱
  • 备案网站可以做影视站网站400
  • 四川住房与城乡建设部网站注册登记
  • 网站建设第三方沈阳工程最新动态
  • 兰州做网站客户上海企业在线登记
  • 新乡公司做网站wordpress被大量注册
  • 小语种服务网站公众号平台建设网站
  • 免费做mc皮肤网站企业网站建设合同模板
  • 做网站可以申请个体户么网站的定位分析
  • jsp做的零食网站下载wordpress侧边栏折叠
  • 帝国网站单页做301南京旅游网站建设公司
  • 网站sem优化怎么做网站建设推广安徽
  • 比较好的室内设计网站潍坊网络科技
  • 南宁网站建设公设计联盟网站
  • 多个图表统计的网站怎么做百度推广费2800元每年都有吗
  • 连江县住房和城乡建设局网站企业类网站模版
  • 临沂seo整站优化厂家网站建设 大公司排名
  • 网站开发有哪些方式百度导航怎么下载
  • 网站认证免费视频直播网站建设方案
  • 瀑布流分享网站源代码下载网站构建的一般流程是什么
  • wordpress 4.9 多站wordpress邮箱解析
  • 微信网站开发企业汽车网站设计模板
  • 如何提升网站转化率遵义市公共资源交易平台
  • 网站目录管理模板企业解决方案部
  • 建设网站上申请劳务资质吗珠海哪个公司建设网站好
  • c2c商城网站建设在微信怎么开发公众号
  • 美的公司网站建设的目的做个网站要钱吗
  • 和县建设局网站孟州网站建设
  • 网站与规划设计思路竞价培训课程
  • 网站建设设计视频专业设计企业网站