当前位置: 首页 > news >正文

做企业网站的费用免费卖货平台

做企业网站的费用,免费卖货平台,室内设计平面图分析,简单aspx网站开发系统学习消息队列——RabbitMQ的发布确认高级篇 简介 ‌RabbitMQ是一个开源的消息代理软件#xff0c;实现了‌高级消息队列协议#xff08;AMQP#xff09;‌#xff0c;主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写#xff0c;具有高性能、健壮…系统学习消息队列——RabbitMQ的发布确认高级篇 简介 ‌RabbitMQ是一个开源的消息代理软件实现了‌高级消息队列协议AMQP‌主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写具有高性能、健壮性和可伸缩性适用于各种规模的企业应用‌。 基本概念和功能 RabbitMQ作为一个消息中间件主要功能包括接收和转发消息支持“生产者-消费者模型”。生产者不断向消息队列中写入消息而消费者则从队列中读取或订阅消息。RabbitMQ支持多种消息传递模式如普通模式、工作模式、发布/订阅模式等以满足不同的应用场景需求‌。 架构和关键组件 RabbitMQ的架构基于生产者-消费者模型通过队列实现消息的存储和转发。队列具有先进先出FIFO的特性并且可以设置持久化、独占、自动删除等属性。RabbitMQ还引入了交换机和路由键等概念以实现更灵活和复杂的消息路由和分发机制‌。 应用场景和优势 RabbitMQ广泛应用于需要高并发处理、流量削峰、系统解耦和提高可靠性的场景。其优势包括 ‌高性能‌RabbitMQ能够处理高并发请求确保系统的稳定运行。‌高可靠性‌通过消息的持久化存储和故障恢复机制确保消息不会丢失。‌灵活性‌支持多种消息传递模式和路由规则满足复杂的应用需求 1.消息发布确认的方案 2.消息的回退 3.备份交换机 1.消息发布确认的方案 在前面的文章中系统学习消息队列——RabbitMQ的消息发布确认我们一定程度上学习了消息的发布确认的基础但是在生产环境中由于RabbitMq的重启RabbitMQ在重启过程中投递失败导致消息丢失需要手动处理和恢复。那么我们该如何保证当RabbitMQ不可用的时候消息的稳定投递呢 我们采取下面的方案 我们将要发送消息做一个持久化发送消息的时候我们持久化一份到数据库或者缓存中当发送消息失败的时候我们进行一次重新发送。所以在发送消息的时候我们要进行代码业务逻辑的处理 yml: server: port:11000 spring: rabbitmq: host:127.0.0.1 port:5672 username:guest password:guest publisher-confirm-type:correlatedpublisher-confirm-type这个参数一共有三种配置方法 NONE: 禁用发布确认是默认值。 CORRELATED: 发布消息后交换机会触发回调方法。 SIMPLE: 有两种效果 1和CORRELATED一样会触发回调方法 2发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker。回调方法类 Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/ Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info(交换机已经收到 id 为:{}的消息, correlationData.getId());} else { log.info(交换机还未收到 id 为:{}消息,由于原因:{}, correlationData.getId(), cause);}}} 队列配置类: Configuration publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME  confirm.exchange; publicstatic final StringCONFIRM_QUEUE_NAME  confirm.queue;Autowired privateMyCallBack myCallBack; Autowired privateRabbitTemplate rabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象 PostConstruct publicvoidinit() {rabbitTemplate.setConfirmCallback(myCallBack);}//声明业务 Exchange Bean(confirmExchange) publicDirectExchangeconfirmExchange(){ returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列 Bean(confirmQueue) publicQueueconfirmQueue(){ returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系 Bean publicBindingqueueBinding(Qualifier(confirmQueue) Queue queue, Qualifier(confirmExchange) DirectExchange exchange){ returnBindingBuilder.bind(queue).to(exchange).with(key1);}}生产者 RestController RequestMapping(/confirm) Slf4j public class ProducerController {public static final String CONFIRM_EXCHANGE_NAME  confirm.exchange;Autowired private RabbitTemplate rabbitTemplate;GetMapping(sendMessage/{message})public void sendMessage(PathVariable String message) { //指定消息 id 为 1CorrelationData correlationData1  new CorrelationData(1); //这个key1是有交换机的key会发送成功String routingKey  key1;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message routingKey, correlationData1); //这个交换机不存在会发送失败CorrelationData correlationData2  new CorrelationData(2);rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME1, routingKey, message routingKey, correlationData2);CorrelationData correlationData3  new CorrelationData(3); //这个key2是没有交换机的key会发送失败routingKey  key2;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message routingKey, correlationData3);log.info(发送消息内容:{}, message);} } 消费者 Component Slf4j publicclassConfirmConsumer {publicstatic final StringCONFIRM_QUEUE_NAME  confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME) publicvoidreceiveMsg(Message message){ String msgnewString(message.getBody());log.info(接受到队列 confirm.queue 消息:{},msg);}} 我们发送信息 http://localhost:11000/confir...可以啊 我们发送三条消息 一条是有交换机有队列的消息 二条是没有交换机的消息 三条是有交换机没有队列的消息 结果如下 我们可以看出 第一条消息正常消费 第二条消息找不到交换机抛异常了 第三条消息绑定键找不到队列这条消息直接被抛弃了 2.消息的回退 我们发现第三条消息的反馈并不是很好在仅仅开启了生产者确认机制的情况下交换机收到消息后会直接给生产者发送确认消息如果该消息不可路由那么消息会直接被抛弃此时生产者是不知道这条消息被丢弃的。所以我们这里要引入消息的回退机制如果消息不能路由到队列就会有一个通知通过设置mandatory参数可以将不可抵达队列的消息返回给生产者。 回调处理逻辑 Component Slf4j publicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机是否收到消息的回调方法* CorrelationData 消息相关数据* ack 交换机是否收到消息* cause 交换机未收到消息的原因*/ Override publicvoidconfirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) {log.info(交换机已经收到 id 为:{}的消息, correlationData.getId());} else {log.info(交换机还未收到 id 为:{}消息,由于原因:{}, correlationData.getId(), cause);}}Override publicvoidreturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error( 消 息 {}, 被 交 换 机 {} 退 回 退 回 原 因 :{}, 路 由 key:{}, newString(message.getBody()),exchange,replyText,routingKey);}} 修改一下前面那个配置类的方法 //依赖注入 rabbitTemplate 之后再设置它的回调对象PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(myCallBack); /*** true* 交换机无法将消息进行路由时会将该消息返回给生产者* false* 如果发现消息无法进行路由则直接丢弃*/rabbitTemplate.setMandatory(true); //设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);} 继续发送消息http://localhost:11000/confir...可以啊 我们发现交换机路由不到的队列也会有反馈了 3.备份交换机 有了前面那个mandatory参数和回退消息我们对于无法投递到目的地的消息可以进行处理了。但是我们在处理这些日志的时候顶多就是打印了一下日志然后触发报警接着手动进行处理。通过日志收集这些无法到达路由的消息非常不优雅而且手动复制日志非常容易出错。而且mandatory参数设置还得增加配置类增加了复杂性。 如果我们不想丢失消息又不想增加配置类该怎么做呢在前面学习死信队列的时候系统学习消息队列——RabbitMQ的死信队列我们可以为队列设置死信交换机来处理那些失败的消息。 RabbitMQ中有备份交换机这种存在它就像死信交换机一样可以用来处理那些路由不到的消息当交换机接收到一份不可路由的消息的时候我们就会把这条消息转发到备份交换机中由备份交换机进行统一处理。 Configuration publicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME  confirm.exchange; publicstatic final StringCONFIRM_QUEUE_NAME  confirm.queue; publicstatic final StringBACKUP_EXCHANGE_NAME  backup.exchange; publicstatic final StringBACKUP_QUEUE_NAME  backup.queue; publicstatic final StringWARNING_QUEUE_NAME  warning.queue;// 声明确认队列 Bean(confirmQueue) publicQueueconfirmQueue(){ returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();} //声明确认队列绑定关系 Bean publicBindingqueueBinding(Qualifier(confirmQueue) Queue queue, Qualifier(confirmExchange) DirectExchange exchange){ returnBindingBuilder.bind(queue).to(exchange).with(key1);}//声明备份 Exchange Bean(backupExchange) publicFanoutExchangebackupExchange(){ returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机 Bean(confirmExchange) publicDirectExchangeconfirmExchange(){ //设置该交换机的备份交换机 ExchangeBuilder exchangeBuilder ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument(alternate-exchange, BACKUP_EXCHANGE_NAME);  return (DirectExchange)exchangeBuilder.build();} // 声明警告队列 Bean(warningQueue) publicQueuewarningQueue(){ returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();} // 声明报警队列绑定关系 Bean publicBindingwarningBinding(Qualifier(warningQueue) Queue queue, Qualifier(backupExchange) FanoutExchangebackupExchange){ returnBindingBuilder.bind(queue).to(backupExchange);} // 声明备份队列 Bean(backQueue) publicQueuebackQueue(){ returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();} // 声明备份队列绑定关系 Bean publicBindingbackupBinding(Qualifier(backQueue) Queue queue, Qualifier(backupExchange) FanoutExchange backupExchange){ returnBindingBuilder.bind(queue).to(backupExchange);}} 我们发现不可路由的消息被发现后就被送到了报警的备份队列里面。 而且这种配置的优先级比mandatory参数更高。
http://www.w-s-a.com/news/726278/

相关文章:

  • 和平网站建设公司做实验教学视频的网站
  • 音乐网站源码带手机版WordPress菜单调用不出
  • 昆明网站设计都需要设计什么网络推广岗位职责和任职要求
  • 国外公司网站模板网站建设公司选择意见书
  • 如何创建一个网站卖东西郑州 网站建设公司
  • 石景山郑州阳网站建设南京网站搜索引擎优化
  • 一个网站需要哪些备案书店网站建设策划书总结
  • 网站建设的重点是什么注册网站空间
  • 网站公司企业宗旨我的网站 dedecms
  • 沧州网站优化做详情图的网站
  • 中国建设银行公积金网站wordpress表单 post
  • 找权重高的网站方法wordpress视频网站上传视频
  • 营销型网站架构师迁移wordpress500错误
  • 做网站还是博客由()承担
  • wordpress 导购站模板中国最新军事新闻直播83军
  • 公众号h5网站开发wordpress文章主图
  • ps怎么艺术字字体设计网站我想自己做网站
  • 北京做机柜空调的网站模板网站和插件
  • 手机购物网站模板wordpress添加分类文档
  • 网站开发知识网上怎么申请个人营业执照
  • 音乐网站建设费用营销策略都有哪些4p
  • 深圳制作网站怎么样wordpress 学习视频
  • 新公司注册网站传奇手游大型网站
  • 无极网站网站涉案多少人被抓网站的按钮怎么做
  • ds216j做网站做购物网站那个好
  • 做淘宝门头的网站阿里巴巴官网app
  • 安踏网站建设策划方案如何通过域名访问网站
  • 建设网站破解版seo查询 站长之家
  • 太原模板建站平台旅游企业网站建设工作的通知
  • 网站国外建设超级简历模板官网