连云港市网站建设,wordpress 获取页面列表,如何使用qq空间做推广网站,网站图片分辨率目录
1.MQ选型
2.RocketMQ基本架构
3.Springboot集成RocketMQ
4.顺序消息
5.延时消息
6.事务消息 1.MQ选型
目前市面上的MQ选型#xff1a;主要分为3个类型
Kafka#xff1a;吞吐量大#xff0c;且性能好#xff0c;集群高可用#xff1b;会丢失数据#xff0c;功…目录
1.MQ选型
2.RocketMQ基本架构
3.Springboot集成RocketMQ
4.顺序消息
5.延时消息
6.事务消息 1.MQ选型
目前市面上的MQ选型主要分为3个类型
Kafka吞吐量大且性能好集群高可用会丢失数据功能较为单一即场景单一适合于数据量大且频繁如日志分析等RabbitMQ消息可靠性高功能全面吞吐量较低并发性能不高消息积累会严重影响性能即消息消费需较快RocketMQ高吞吐、高性能、高可用官方文档及周边生态不成熟客户端只支持java。
简而言之Apache RocketMQ 自诞生以来因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。
2.RocketMQ基本架构
RocketMQ的基本架构如下图所示 Producer生产者消息的生产者一般为上游系统。
Topic主题消息传输和存储的顶层容器用于标识同一类业务逻辑的消息。 主题的作用主要如下1定义数据的分类隔离将不同业务类型的数据拆分到不同的主题中管理通过主题实现存储的隔离性和订阅隔离性。2定义数据的身份和权限由于消息本身是匿名无身份的同一分类的消息使用相同的主题来做身份识别和权限管理。
Queue队列队列是 RocketMQ 中消息存储和传输的实际容器也是 RocketMQ 消息的最小存储单元。 RocketMQ 的所有主题都是由多个队列组成以此实现队列数量的水平拆分和队列内部的流式存储。
Subscription订阅关系订阅关系是 RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。通过配置订阅关系可控制如下传输行为1消息过滤规则用于控制消费者在消费消息时选择主题内的哪些消息进行消费。2消费状态RocketMQ 服务端默认提供订阅关系持久化的能力即消费者分组在服务端注册订阅关系后当消费者离线并再次上线后可以获取离线前的消费进度并继续消费。
ConsumerGroup消费者分组消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。
Comsumer消费者消息的消费者即对消息进行接收和处理的相关下游系统。
一般来说在RocketMQ中生产者生产出消息后指定对应的Topic、订阅关系Tags参数、队列hashkey参数后将消息发送至RocketMQ客户端消费者对RocketMQ客户端进行监听当监听到有自己订阅的Topic下的消息时进行接收并进行消费。
3.Springboot集成RocketMQ
首先引入相关依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.0/version/dependency
其次对RocketMQ进行配置
server:port: 8080spring:application:name: cloud-rocket-mqrocketmq:name-server: 127.0.0.1:9876producer:group: test-group #生产者组名规定在一个应用里面必须唯一send-message-timeout: 5000 #消息发送的超时时间单位msretry-times-when-send-async-failed: 5 #异步消息发送失败重试的次数
RocketMQ支持我们异步发送普通消息。
普通消息是指上游系统生产者将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的且不需要产生关联。
1生产者代码编写
Slf4j
RestController
public class SendMessageController {Resourceprivate RocketMQTemplate rocketMQTemplate;PostMapping(/send)public void send(RequestParam(message) String message) throws InterruptedException {//发送异步消息参数topic、消息rocketMQTemplate.convertAndSend(topic_test:tagA,messagetagA);rocketMQTemplate.convertAndSend(topic_test:tagB,messagetagB);log.info(已发送异步消息);}
}
2消费者代码编写
Service
Slf4j
RocketMQMessageListener(topic topic_test, consumerGroup consumer_topic_test,selectorExpression tagA || tagC)
public class MessageConsumer implements RocketMQListenerString {Overridepublic void onMessage(String s) {log.info(收到消息s);}
}
3代码逻辑
在生产者端我们发送了一个消息到 topic_test 这一Topic下并指定tagA订阅规则下的消费者组可以进行消费。
在消费者端我们定义其消费者组名称订阅关系为订阅 topic_test 下的 tagA 或者 tagB消息并进行消费。 可以看到消费者成功监听到 topic_test:tagA 下的消息。
4.顺序消息
RocketMQ中可以发送顺序消息即支持消费者按照发送消息的先后顺序获取消息从而实现业务场景中的顺序处理。 相比其他类型消息顺序消息在发送、存储和投递的处理过程中更多强调多条消息间的先后顺序关系。 如上图所示在分布式系统中我们有多个生产者执行同一套代码顺序消息可以保证系统按照多个生产者发出消息的前后顺序进行顺序消费如以证券、股票交易撮合场景为例对于出价相同的交易单坚持按照先出价先交易的原则下游处理订单的系统需要严格按照出价顺序来处理订单。 代码
//发送顺序消息参数topic消息hashkey相同hashkey发送至同一个队列
rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 1).build(),queue);
rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 2).build(),queue);
5.延时消息
即消息被发送至服务端后在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。以电商交易场景为例订单下单后暂未支付此时不可以直接关闭订单而是需要等待一段时间后才能关闭订单。使用 RocketMQ 定时消息可以实现超时任务的检查触发。
代码
//发送延时消息
rocketMQTemplate.syncSend(topic_test:tagA, MessageBuilder.withPayload(message).build(), 3000, 2);
其中第四个参数为延时级别分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
6.事务消息 分布式系统调用的特点为一个核心业务逻辑的执行同时需要调用多个下游业务进行处理。因此如何保证核心业务和多个下游业务的执行结果完全一致是分布式事务需要解决的主要问题。
事务消息就是在普通消息基础上支持二阶段的提交能力。将二阶段提交和本地事务绑定实现全局提交结果的一致性。
简单来说就是保证本地事务执行成功消费者才会接受消息进行消费。
执行过程 代码
1生产者 RequestMapping(/send/transaction)public void sendTransactionMessage(RequestParam(msg) String msg){//发送事务消息:采用的是sendMessageInTransaction方法返回结果为TransactionSendResult对象该对象中包含了事务发送的状态、本地事务执行的状态等//参数一topic;参数二消息TransactionSendResult result rocketMQTemplate.sendMessageInTransaction(topic_test:tagA, MessageBuilder.withPayload(msg).build(),null);//发送状态String sendStatus result.getSendStatus().name();//本地事务执行状态String localState result.getLocalTransactionState().name();log.info(发送状态:sendStatus;本地事务执行状态localState);}
2消费者端代码和上文相同保持不变。
3本地事务
/*** 生产者消息监听器* 用于监听本地事务执行的状态和检查本地事务状态。* author qzz*/
RocketMQTransactionListener
Slf4j
public class TransactionMsgConfig implements RocketMQLocalTransactionListener {/*** 执行本地事务在发送消息成功时执行* param message* param o* return commit or rollback or unknown*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//处理业务String jsonStr new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);log.info(执行本地业务消息为jsonStr);//模拟网络波动//Thread.sleep(35000);//被除数为0模拟业务出错//int a 10/0;}catch (Exception e){log.error(事务执行出错e.getMessage());//返回ROLLBACK状态进行回滚return RocketMQLocalTransactionState.ROLLBACK;}log.info(事务提交消息正常处理);//返回COMMIT状态的消息会立即被消费者消费到return RocketMQLocalTransactionState.COMMIT;}/*** 检查本地事务的状态* param message* return*/Override//超时、事务状态unknown等会调用该方法public RocketMQLocalTransactionState checkLocalTransaction(Message message) {log.info(消息回查);return RocketMQLocalTransactionState.ROLLBACK;}
}
我们需要编写一个本地事务执行类继承 RocketMQLocalTransactionListener 类。
在该类中我们对本地事务的异常进行捕捉如果出现异常则返回 ROLLBACK执行状态顺利执行则最终返回 COMMIT状态。
如果出现超时等网络波动或是UNKNOWN状态等情况该类则会调用 checkLocalTransaction方法返回方法中定义的事务状态。
4执行
1.顺利执行消费者成功消费 可以看到消息成功发送消费者成功消费。
2.本地事务出现异常 可以看到本地事务抛出了异常事务进行了回滚消费者没有进行消费。
3.模拟超时 可以看到当事务在一段时间内未返回对应事务状态 则会调用对应回查方法直至事务成功返回事务执行状态。