当前位置: 首页 > news >正文

郑州做网站九零后网络沧州做网站的专业公司

郑州做网站九零后网络,沧州做网站的专业公司,商城做网站哪家好,秦皇岛住房和城乡建设网官网背景 1、对接多个节点上的MQ#xff08;如master-MQ#xff0c;slave-MQ#xff09;#xff0c;若读者需要自己模拟出两个MQ#xff0c;可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客 2、队列名称不是固定的#xff0c;需要接受外部参数如master-MQslave-MQ若读者需要自己模拟出两个MQ可以部署多个VM然后参考 docker 安装rabbitmq_Steven-Russell的博客-CSDN博客 2、队列名称不是固定的需要接受外部参数并且通过模板进行格式化才能够得到队列名称 3、需要在master-MQ上延迟一段时间然后将消息再转发给slave-MQ 问题 1、采用springboot的自动注入bean需要事先知道队列的名称但是队列名称是动态的情况下无法实现自动注入 2、mq弱依赖在没有master-mq或者slave-mq时不能影响到现有能力 解决方案 1、由于mq的队列创建、exchange创建以及队列和exchange的绑定关系是可重入的所以采用connectFactory进行手动声明 2、增加自定义条件OnMqCondition防止不必要的bean创建 总体流程 实施过程 搭建springboot项目 参考 搭建最简单的SpringBoot项目_Steven-Russell的博客-CSDN博客 引入amqp依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency 引入后续会用到的工具类依赖 dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.11.0/version /dependency dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.28/versionscopeprovided/scope /dependency dependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.40/version /dependency 创建配置文件 在application.yml中增加如下配置 mq:master:addresses: 192.168.30.128:5672username: guestpassword: guestvhost: /slave:addresses: 192.168.30.131:5672username: guestpassword: guestvhost: /创建自定义Condition注解和注解实现 package com.wd.config.condition;import org.springframework.context.annotation.Conditional;import java.lang.annotation.*;Target({ElementType.TYPE, ElementType.METHOD}) Retention(RetentionPolicy.RUNTIME) Documented Conditional(OnMqCondition.class) public interface MqConditional {String[] keys();}package com.wd.config.condition;import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.lang.NonNull; import org.springframework.util.ObjectUtils;import java.util.Map;public class OnMqCondition implements Condition {Overridepublic boolean matches(NonNull ConditionContext context, NonNull AnnotatedTypeMetadata metadata) {MapString, Object annotationAttributes metadata.getAnnotationAttributes(MqConditional.class.getName());if (annotationAttributes null || annotationAttributes.isEmpty()) {// 为空则不进行校验了return true;}String[] keys (String[])annotationAttributes.get(keys);for (String key : keys) {String property context.getEnvironment().getProperty(key);if (ObjectUtils.isEmpty(property)) {return false;}}return true;} }创建多个链接工厂connectFactory package com.wd.config;import com.wd.config.condition.MqConditional; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary;Configuration public class MqConnectionFactory {Value(${mq.master.addresses})private String masterAddresses;Value(${mq.master.username})private String masterUsername;Value(${mq.master.password})private String masterPassword;Value(${mq.master.vhost})private String masterVhost;Value(${mq.slave.addresses})private String slaveAddresses;Value(${mq.slave.username})private String slaveUsername;Value(${mq.slave.password})private String slavePassword;Value(${mq.slave.vhost})private String slaveVhost;BeanPrimaryMqConditional(keys {mq.master.addresses, mq.master.vhost, mq.master.username, mq.master.password})public ConnectionFactory masterConnectionFactory() {return doCreateConnectionFactory(masterAddresses, masterUsername, masterPassword, masterVhost);}BeanMqConditional(keys {mq.slave.addresses, mq.slave.vhost, mq.slave.username, mq.slave.password})public ConnectionFactory slaveConnectionFactory() {return doCreateConnectionFactory(slaveAddresses, slaveUsername, slavePassword, slaveVhost);}private ConnectionFactory doCreateConnectionFactory(String addresses,String username,String password,String vhost) {CachingConnectionFactory cachingConnectionFactory new CachingConnectionFactory();cachingConnectionFactory.setAddresses(addresses);cachingConnectionFactory.setUsername(username);cachingConnectionFactory.setPassword(password);cachingConnectionFactory.setVirtualHost(vhost);return cachingConnectionFactory;}}创建交换机名称枚举 DeclareQueueExchange package com.wd.config;public enum DeclareQueueExchange {EXCHANGE(exchange),DEAD_EXCHANGE(deadExchange),DELAY_EXCHANGE(delayExchange);private final String exchangeName;DeclareQueueExchange(String exchangeName) {this.exchangeName exchangeName;}public String getExchangeName() {return exchangeName;} }创建消息队列模板枚举 DeclareQueueName package com.wd.config;public enum DeclareQueueName {DELAY_QUEUE_NAME_SUFFIX(_delay),DEAD_QUEUE_NAME_SUFFIX(_dead),QUEUE_NAME_TEMPLATE(wd.simple.queue.{0});private final String queueName;DeclareQueueName(String queueName) {this.queueName queueName;}public String getQueueName() {return queueName;} }创建消息VO和消息 package com.wd.controller.vo;import com.wd.pojo.Phone; import lombok.Data;Data public class DelayMsgVo {private String queueId;private Phone phone; }package com.wd.pojo;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable; import java.util.Date; import java.util.List;Data AllArgsConstructor NoArgsConstructor public class Phone implements Serializable {private static final long serialVersionUID -1L;private String id;private String name;private Date createTime;private ListUser userList;}package com.wd.pojo;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable; import java.util.Date;Data AllArgsConstructor NoArgsConstructor public class User implements Serializable {private static final long serialVersionUID -1L;private String username;private Date create; }定义队列id列表缓存用于替换三方缓存用于队列名称模板初始化 package com.wd.config;import java.util.ArrayList; import java.util.List;public interface QueueIdListConfig {/*** 先用本地缓存维护队列id*/ListInteger QUEUE_ID_LIST new ArrayListInteger() {{add(111);add(222);add(333);}}; }创建消息接受入口 controller 注意此处就以web用户输入为入口所以创建controller package com.wd.controller;import com.alibaba.fastjson2.JSONObject; import com.rabbitmq.client.*; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.controller.vo.DelayMsgVo; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.web.bind.annotation.*;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;RestController ConditionalOnBean(value ConnectionFactory.class, name masterConnectionFactory) public class DynamicCreateQueueController {private final ConnectionFactory masterConnectionFactory;public DynamicCreateQueueController(Qualifier(value masterConnectionFactory) ConnectionFactory masterConnectionFactory) {this.masterConnectionFactory masterConnectionFactory;}PostMapping(value sendDelayMsg)public String sendMsg2DelayQueue(RequestBody DelayMsgVo delayMsgVo) throws IOException, TimeoutException {doSendMsg2DelayQueue(delayMsgVo);return success;}private void doSendMsg2DelayQueue(DelayMsgVo delayMsgVo) throws IOException, TimeoutException {// 根据id 动态生成队列名称String queueNameTemplate DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();String queueName MessageFormat.format(queueNameTemplate, delayMsgVo.getQueueId());String delayQueueName queueName DeclareQueueName.DELAY_QUEUE_NAME_SUFFIX.getQueueName();String deadQueueName queueName DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();// 注意下述声明交换机和队列的操作是可以重入的MQ并不会报错try (Connection connection masterConnectionFactory.createConnection();Channel channel connection.createChannel(false)){// 声明死信交换机channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);// 声明死信队列AMQP.Queue.DeclareOk deadQueueDeclareOk channel.queueDeclare(deadQueueName,true, false, false, null);// 定时任务 绑定消费者避免出现多个消费者以及重启后无法消费存量消息的问题// 注意因为需要保证消费顺序所以此处仅声明一个消费者// 死信队列和交换机绑定channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);// 声明延迟队列MapString, Object args new HashMap();//设置延迟队列绑定的死信交换机args.put(x-dead-letter-exchange, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName());//设置延迟队列绑定的死信路由键args.put(x-dead-letter-routing-key, deadQueueName);//设置延迟队列的 TTL 消息存活时间args.put(x-message-ttl, 10 * 1000);channel.queueDeclare(delayQueueName, true, false, false, args);channel.exchangeDeclare(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);channel.queueBind(delayQueueName, DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName);// 发送消息到延迟队列channel.basicPublish(DeclareQueueExchange.DELAY_EXCHANGE.getExchangeName(), delayQueueName, null,JSONObject.toJSONString(delayMsgVo.getPhone()).getBytes(StandardCharsets.UTF_8));}}}创建master延迟消息消费者 package com.wd.mq.consumer;import com.rabbitmq.client.*; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** 死信消费者消费消息转发给targetConnectionFactory对应的目标MQ*/ public class MasterDeadQueueConsumer extends DefaultConsumer {private final ConnectionFactory targetConnectionFactory;public MasterDeadQueueConsumer(Channel channel, ConnectionFactory targetConnectionFactory) {super(channel);this.targetConnectionFactory targetConnectionFactory;}Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 从死信队列的名称中截取队列名称作为后续队列的名称String routingKey envelope.getRoutingKey();String targetQueueName routingKey.substring(0, routingKey.length() - DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName().length());try (Connection targetConnection targetConnectionFactory.createConnection();Channel targetChannel targetConnection.createChannel(false)){// 声明交换机和队列targetChannel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);targetChannel.queueDeclare(targetQueueName, true, false, false, null);targetChannel.queueBind(targetQueueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName);// 转发消息targetChannel.basicPublish(DeclareQueueExchange.EXCHANGE.getExchangeName(), targetQueueName, properties, body);} catch (TimeoutException e) {e.printStackTrace();// 注意此处获取的源队列的channelgetChannel().basicNack(envelope.getDeliveryTag(), false, true);}// 注意此处获取的源队列的channelgetChannel().basicAck(envelope.getDeliveryTag(), false);} }创建slave队列消息消费者 package com.wd.mq.consumer;import com.alibaba.fastjson2.JSONObject; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.wd.pojo.Phone;import java.io.IOException;public class SlaveQueueConsumer extends DefaultConsumer {public SlaveQueueConsumer(Channel channel) {super(channel);}Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {Phone phone JSONObject.parseObject(new String(body), Phone.class);System.out.println(SlaveQueueConsumer consume phone);getChannel().basicAck(envelope.getDeliveryTag(), false);} }创建定时任务消费延迟消息 注意因为采用的是死信队列的方式实现的延迟效果此处只需要消费对应的死信队列即可 package com.wd.mq.quartz;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.config.QueueIdListConfig; import com.wd.mq.consumer.MasterDeadQueueConsumer; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled;import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException;Configuration ConditionalOnBean(value ConnectionFactory.class, name {slaveConnectionFactory, masterConnectionFactory}) public class MasterDeadQueueSubscribeProcessor {private final ConnectionFactory masterConnectionFactory;private final ConnectionFactory slaveConnectionFactory;public MasterDeadQueueSubscribeProcessor(Qualifier(value masterConnectionFactory) ConnectionFactory masterConnectionFactory,Qualifier(value slaveConnectionFactory) ConnectionFactory slaveConnectionFactory) {this.masterConnectionFactory masterConnectionFactory;this.slaveConnectionFactory slaveConnectionFactory;}/*** 消费死信队列信息并且转发到其他mq*/Scheduled(fixedDelay 10 * 1000)public void subscribeMasterDeadQueue() throws IOException, TimeoutException {// 根据id 动态生成队列名称// 此处的queueIdList可以从第三方缓存查询得到并且和sendDelayMsg接口保持同步刷新此处先用本地缓存代替id同步刷新机制不是重点此处暂不讨论for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {String queueNameTemplate DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();String deadQueueName MessageFormat.format(queueNameTemplate, id) DeclareQueueName.DEAD_QUEUE_NAME_SUFFIX.getQueueName();try (Connection connection masterConnectionFactory.createConnection();Channel channel connection.createChannel(false)){AMQP.Queue.DeclareOk queueDeclare channel.queueDeclare(deadQueueName, true, false, false, null);if (queueDeclare.getConsumerCount() 0) {channel.exchangeDeclare(DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);}channel.queueBind(deadQueueName, DeclareQueueExchange.DEAD_EXCHANGE.getExchangeName(), deadQueueName);channel.basicConsume(deadQueueName, false, new MasterDeadQueueConsumer(channel, slaveConnectionFactory));}}}}创建定时任务消费slave队列的消息 package com.wd.mq.quartz;import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.wd.config.DeclareQueueExchange; import com.wd.config.DeclareQueueName; import com.wd.config.QueueIdListConfig; import com.wd.mq.consumer.SlaveQueueConsumer; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled;import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException;Configuration ConditionalOnBean(value ConnectionFactory.class, name slaveConnectionFactory) public class SlaveQueueSubscribeProcessor {private final ConnectionFactory slaveConnectionFactory;public SlaveQueueSubscribeProcessor(Qualifier(value slaveConnectionFactory) ConnectionFactory slaveConnectionFactory) {this.slaveConnectionFactory slaveConnectionFactory;}/*** 消费队列信息*/Scheduled(fixedDelay 10 * 1000)public void subscribeSlaveDeadQueue() throws IOException, TimeoutException {// 根据id 动态生成队列名称// 此处的queueIdList可以从第三方缓存查询得到并且和sendDelayMsg接口保持同步刷新此处先用本地缓存代替for (Integer id : QueueIdListConfig.QUEUE_ID_LIST) {String queueNameTemplate DeclareQueueName.QUEUE_NAME_TEMPLATE.getQueueName();String queueName MessageFormat.format(queueNameTemplate, id);try (Connection connection slaveConnectionFactory.createConnection();Channel channel connection.createChannel(false)){AMQP.Queue.DeclareOk queueDeclare channel.queueDeclare(queueName, true, false, false, null);if (queueDeclare.getConsumerCount() 0) {channel.basicConsume(queueName, false, new SlaveQueueConsumer(channel));}channel.exchangeDeclare(DeclareQueueExchange.EXCHANGE.getExchangeName(), BuiltinExchangeType.DIRECT);channel.queueBind(queueName, DeclareQueueExchange.EXCHANGE.getExchangeName(), queueName);}}}}启动项目 请求接口发送消息 http://localhost:8080/sendDelayMsg 检查消息传递过程 先在master-mq延迟队列发现消息 再到master-mq死信队列中发现消息 再到slave-mq中发现消息 检查日志打印 发现SlaveQueueConsumer打印如下日志 结论 消息传递流程如下验证通过
http://www.w-s-a.com/news/979695/

相关文章:

  • 小游戏网站建设可以自己做图片的软件
  • 湖南地税局官网站水利建设基金app仿制
  • 苏州网站设计kgwl建设网站需要用到哪些技术人员
  • 万户网络做网站如何亚马逊网站建设
  • 门户网站制作费用暴雪公司最新消息
  • 深圳专业建网站公司济南公司做网站的价格
  • 怎么运行自己做的网站网上申请平台怎么申请
  • 旅游公司网站 优帮云新闻近期大事件
  • 电商网站后台报价营销软文小短文
  • 网站建设项目售后服务承诺公司名称邮箱大全
  • 湖南网站建设哪里好做ppt的网站叫什么名字
  • 容城县建设银行网站电子商务网站建设子项目
  • 网站管理助手3.0做淘宝网站用什么软件做
  • 贵阳做网站的公司wordpress趣味插件
  • 自己设置免费网站设计平台南京哪里有做公司网站的
  • 建设公司内网网站的意义自助建站网站的宣传手册
  • 手机建设中网站建立个人网站服务器
  • 网站开发工程师岗位概要网站怎么制作教程
  • 城乡建设主管部门官方网站公司简介模板ppt范文
  • 网站认证必须做么cc0图片素材网站
  • net域名 著名网站国外设计案例网站
  • 淘宝客网站哪里可以做app地推网
  • 宜昌建设厅网站中国最新时事新闻
  • 微网站怎么开发wordpress 发表评论
  • 山东网站建设是什么一页网站首页图如何做
  • 游戏开发与网站开发哪个难万网影
  • 做网站编程语言建筑施工特种证书查询
  • 找人做网站内容自己编辑吗修改wordpress登陆界面
  • 登陆建设银行wap网站湖南网站建设磐石网络答疑
  • 58网站怎么做浏览度才高论坛网站怎么做排名