物流网络,拼多多关键词怎么优化,腾讯云网站免费建设,app拉新怎么做文章目录 前言问题分析消息队列特性分析kafkarocketmqrabbitmq 发布订阅公共抽象发送端订阅端 前言
消息队列是一种用于在应用程序的不同组件或系统之间传递消息的通信机制。它通过将消息存储在一个队列中#xff0c;确保消息能够可靠地从发送方传递到接收方#xff0c;即使… 文章目录 前言问题分析消息队列特性分析kafkarocketmqrabbitmq 发布订阅公共抽象发送端订阅端 前言
消息队列是一种用于在应用程序的不同组件或系统之间传递消息的通信机制。它通过将消息存储在一个队列中确保消息能够可靠地从发送方传递到接收方即使发送方和接收方不同时在线或处理能力不同。消息队列在现代分布式系统、微服务架构以及异步处理场景中扮演着至关重要的角色所以在处理灰度发布的实现避免不了对其改造。
问题分析
消息的灰度路由不像应用服务之间可以简单的改写路由规则网上看到很多关于消息灰度的方案都不是很理想大多涉及到需要配合修改业务代码;现实中在打算实施灰度发布方案时往往已存在大量的旧服务在上线运行着改造推动难信不可想象方案是否能落地得大打折扣所以如果涉及到需要改业务代码的方案不太能行得通现在唯一到想到的是通过agent的方案可以实现无需改动业务代码并且接入升级较为方便。
消息队列特性分析
本文重点是关注消息队列可能作灰度路由特性的分析个人认为消息灰度路由作为整个灰度发布最为麻烦的点。
kafka
kafka 有个消费者组这样的特性: 同一个topic某条消息同一消费者的只有其中一个消费者能收到该消息利用这一特性可以把某一topic的消费者组分为:正常的消费者组和灰度消息者组;正常的消费者收到topic消息后判断如果是正常的消息则接受,如果是携带灰度标签的消息则丢弃同量灰度消费者收到topic消息后如果正常消息去弃掉如果是携带灰度标签的消息则接受。对生产端如接收到的请求是灰度请求则发送携带灰度标签的消息否则发送不携带灰度标签的消息。
rocketmq
rocketmq 跟kafka有些类似处理方式可以同步
rabbitmq
rabbitmq 的发布、订阅与kafka或rocketmq 有些不同rabbitmq 中的exchange和routingKey组成有些相当于kafka中的topic服务订阅下要创建queue及绑定相应的exchange和routingKey没有消费者组的概念但同一条exchange和routingKey可以绑定多个队列同时它用virtualhost的概念,vhost 是 RabbitMQ 中的一个逻辑隔离单元类似于操作系统中的“虚拟目录”或 Web 服务器中的“虚拟主机”。每个 vhost 都有自己的独立命名空间包含自己的交换机、队列、绑定、用户权限等;那么可以利可以用绑定多个queue或virtualhost的特性实现消息灰度发布订阅考虑管理实现复杂难易度本方采用利用virtualhost的特性实现消息的灰度发布订阅:
在rabbitmq原有的vitualhost 配置基本上,利用程序创建一个对应的灰度virtualhost,并完全copy原有的配置服务在启动时据自身所处的状态mq客户端连接相应用virtualhost:如处理灰度状态连接灰度的virtualhost,如果处理正常状态则连接原有的virtualhost;实例状态在线切换时销毁原有的连接切换到对应的virtualhost连接对生消息发生端则同时连接正常virtualhost和灰度的vitualhost,如果发送正常消息则使用正常的连接如果发送灰度消息则使用灰度virtualhost连接但是存在一个问题当某个服务只有灰度或正常实例时与其状态相反的消息没法被消费除非做了双订阅同时订阅正常消息和灰度消息如果是kafka和rocketmq则可以很好处理该问题。
发布订阅公共抽象
据以上分析抽象出消息队列了的发送端和订况端
发送端
public abstract class AbstractPublishInterceptorM{/*** 续传灰度变量* param allArguments* param routingEnv* return*/protected abstract ListM setOutContext(Object[] allArguments, String routingEnv);protected abstract ComponentType getType();
}
订阅端
public interface InInterceptor {default void setContext(String routingEnv) {ServerContextHolder.setData(X_ENV, routingEnv);}default void removeContext() {ServerContextHolder.remove(X_ENV);}default String getContext(Object... args) {if (args null) {return (String) ServerContextHolder.getData(X_ENV);}return null;}
}public abstract class AbstractConsumeInterceptor implements InInterceptor, InstanceMethodsAroundInterceptor {protected static final ILog logger LogManager.getLogger(AbstractConsumeInterceptor.class);protected boolean discard(Object xEnv) {ServerInstance instance ServerInstance.getInstance();if (instance.getEnvStatus() 0 xEnv null) {if (instance.getNormalInstances() 1 instance.getMessageConsumeMode() 1) {//没有正常实例,让灰度实例消息费(一般不会出现该情况)return false;}//灰度实列,丢弃正常消息logger.debug(灰度实列丢掉正常消息);return true;}if (instance.getEnvStatus() 1 xEnv ! null) {if (instance.getGrayInstances() 1 instance.getMessageConsumeMode() 1) {//没有灰度实例,让正常实例消息费如果灰度再起来可能会出现重复消费return false;}//正常实例收到灰度消息,丢弃logger.info(正常实列丢掉灰度消息);return true;}return false;}protected abstract ComponentType getType();
}