当前位置: 首页 > news >正文

网站建设基础实训报告企业官网招聘信息

网站建设基础实训报告,企业官网招聘信息,学校网站建设的应用意义案例,wordpress查询次数代码托管于gitee#xff1a;easy-rocketmq 文章目录一、前置工作二、消费者三、生产者1. 普通消息2. 过滤消息3. 同步消息4. 延时消息5. 批量消息6. 异步消息7. 单向消息8. 顺序消息9. 事务消息概要Demo源码解读一、前置工作 1、导入依赖 dependencygroupId…代码托管于giteeeasy-rocketmq 文章目录一、前置工作二、消费者三、生产者1. 普通消息2. 过滤消息3. 同步消息4. 延时消息5. 批量消息6. 异步消息7. 单向消息8. 顺序消息9. 事务消息概要Demo源码解读一、前置工作 1、导入依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.1/version /dependency2、编写配置 rocketmq:name-server: 47.96.232.192:9876producer:# 生产组名group: demoGroup# 消息发送超时时间send-message-timeout: 3000# 消息体阈值,4k以上会压缩compress-message-body-threshold: 4096# 在同步模式下发送失败之前在内部执行的最大重试次数。retry-times-when-send-failed: 3# 在异步模式下发送失败之前在内部执行的最大重试次数。retry-times-when-send-async-failed: 3# 消息阈值最大4MB在 4KB 之内性能最佳max-message-size: 4096二、消费者 实现RocketMQListener接口使用 RocketMQMessageListener 注册监听需指定消费者组和Topic。 Component RocketMQMessageListener(consumerGroup demo-consumer-group, // consumerGroup消费者组名topic Demo, // topic订阅的主题selectorExpression *, // selectorExpression控制可以选择的消息可以使用SelectorType.SQL92语法。设置为 * 时表示全部。messageModel MessageModel.CLUSTERING // messageModel: 控制消息模式。MessageModel.CLUSTERING负载均衡MessageModel.BROADCASTING广播模式 ) public class MqConsumerListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(消费消息- message);}}测试一下消费情况 三、生产者 1. 普通消息 普通消息无返回值只负责发送消息⽽不等待服务器回应且没有回调函数触发。 Override public void convertAndSend(D destination, Object payload) throws MessagingException {convertAndSend(destination, payload, (MapString, Object) null); }发送一个普通消息吧 SpringBootTest class RocketmqTemplateDemoApplicationTests {Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 普通消息无返回值只负责发送消息⽽不等待服务器回应且没有回调函数触发* - 参数一topicName:tags主题:标签可单Topic不指定Tag* - 参数二消息体*/Testpublic void sendBaseMsg() {rocketMQTemplate.convertAndSend(Demo:base,普通消息测试);} }源码 接下来我们一步一步点进去 此时我们发现其调用的最终调用了 doSend 方法在其实现类RocketMQTemplate中调用了syncSend()方法该方法以同步的方式发送消息并且返回值SendResult。但 doSend 方法的返回值类型是void并不返回SendResult。 我们发现该syncSend()底层是调用了DefaultMQProducer的send方法这里关于DefaultMQProducer就不做过多讲解了 2. 过滤消息 convertAndSend还有另外一个可携带属性的重载方法可以通过给消息携带属性的方式消费者利用sql92的方式实现消息过滤 Override public void convertAndSend(D destination, Object payload, Nullable MapString, Object headers)throws MessagingException {convertAndSend(destination, payload, headers, null); }发送携带属性的消息吧 发送若干条消息携带属性a属性值分别为0~9。消费者消息过滤仅消费携带属性a且属性值在[6,9]范围内的包含 /*** 过滤消息*/ GetMapping(/filter) public void sendFilterMsg() {for (int i 0; i 10; i) {HashMapString, Object harder new HashMap(1);harder.put(a, String.valueOf(i));rocketMQTemplate.convertAndSend(Demo:filter,过滤消息测试 i, harder);} }通过sql92过滤消息 1、修改选择消息模式为 SelectorType.SQL92默认模式是SelectorType.TAG。 selectorType SelectorType.SQL922、编写过滤sql selectorExpression a BETWEEN 6 AND 9Component RocketMQMessageListener(consumerGroup demo-consumer-group, // consumerGroup消费者组名topic Demo, // topic订阅的主题selectorType SelectorType.SQL92, // selectorType那种模式选择消息selectorExpression a BETWEEN 6 AND 9, // selectorExpression控制可以选择的消息可以使用SelectorType.SQL92语法。设置为 * 时表示全部。messageModel MessageModel.CLUSTERING // messageModel: 控制消息模式。MessageModel.CLUSTERING负载均衡MessageModel.BROADCASTING广播模式 ) public class MqConsumerListener implements RocketMQListenerObject {Overridepublic void onMessage(Object message) {System.out.println(消费消息- message);}}biu发送消息 源码 通过观看源码我们发现在封装RocketMQ消息时会将属性塞进消息头中然后发送。在消费时监听器会从消息头中获取该属性过滤消息。 3. 同步消息 同步消息有返回值SendResult等到消息发送成功后才算结束。 /*** 同步消息有返回值SendResult等到消息发送成功后才算结束。*/Testpublic void sendSyncMsg() {SendResult result rocketMQTemplate.syncSend(Demo:sync, 同步消息测试);System.out.println(JSONObject.toJSONString(result));}源码 public SendResult syncSend(String destination, Object payload) {return syncSend(destination, payload, producer.getSendMsgTimeout()); }在第一节中我们已经看过其源码底层是调用了DefaultMQProducer的send方法 4. 延时消息 /*** 延时消息*/ Test public void sendDelayMsg() {MessageString msg MessageBuilder.withPayload(延时消息测试).build();rocketMQTemplate.syncSend(Demo:delay, msg, 100, 3); }在RocketMQ中并不支持任意时间的延迟需要设置几个固定的延时等级从1s到2h分别对应着等级1到18: // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;源码 destinationtopicName:tags主题:标签可单Topic不指定Tagmessageorg.springframework.messaging.Message具有标题和正文的通用消息实体timeout消息发送时间域delayLevel延迟消息的级别 5. 批量消息 批量发送消息能显著提高传递小消息的性能。但需要注意这些批量消息应该有相同的topic相同的waitStoreMsgOK而且不能是延时消息。此外消息的大小也有限制 消息体大小最大为 4MB, 一般建议发送的消息体在 4kb 之内 ( 性能最佳 )。 消息属性最大为 32kb一般建议发送的消息属性在 1kb 之内 ( 性能最佳 )。 4MB 这个上限值不能修改这个会影响全局性能。如果消息体的确很大建 议侧优化消息体的内容避免发送大消息或者带有链接地址的消息或者可 以缩短或者分两条发送。 public T extends Message SendResult syncSend(String destination, CollectionT messages) {return syncSend(destination, messages, producer.getSendMsgTimeout()); }Demo /*** 批量消息*/ Test public void sendOneMsg() {ArrayListMessageString messages new ArrayList();for (int i 0; i 10; i) {messages.add(MessageBuilder.withPayload(批量消息(i1)).build());}rocketMQTemplate.syncSend(Demo:batch, messages); }瞅瞅源码 6. 异步消息 异步消息无返回值需要传入回调类。无需等待消息是否发送成功。 public void asyncSend(String destination, Object payload, SendCallback sendCallback) {asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout()); }Demo Test public void sendAsyncMsg() throws InterruptedException {rocketMQTemplate.asyncSend(Demo:async, 异步消息测试, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(异步消息发送成功);}Overridepublic void onException(Throwable throwable) {System.out.println(异步消息发送失败);}});Thread.sleep(1000); }源码 同理最终调用的也是producer的send方法。 7. 单向消息 单向消息与UDP类似此方法在返回前不会等待代理的确认。显然它具有最大的吞吐量但也有消息丢失的可能性。 /*** 单向消息与UDP类似此方法在返回前不会等待代理的确认。显然它具有最大的吞吐量但也有消息丢失的可能性。*/ Test public void sendOneMsg() {rocketMQTemplate.sendOneWay(Demo:one, 单向消息测试); }源码 /*** Same to {link #sendOneWay(String, Message)}** param destination formats: topicName:tags* param payload the Object to use as payload*/ public void sendOneWay(String destination, Object payload) {Message? message MessageBuilder.withPayload(payload).build();sendOneWay(destination, message); }8. 顺序消息 大家有没有发现上面批量发送的消息被消费的时候顺序错乱了有时只有保证消息的顺序消费才有意义。比如网购下单到付款时分别会发送生成订单、锁定库存、成功下单三个消息消费时要按照这个顺序依次消费才有意义。 顺序错乱的原因 首先我们先来分析一下顺序错乱的原因其实RocketMQ里的分区队列MessageQueue本身是能保证FIFO的正常情况下不能顺序消费消息主要有两个原因 Producer发送消息到MessageQueue时是轮询发送的消息被发送到不同的分区队列就不能保证FIFO了。Consumer默认是多线程并发消费同一个MessageQueue的即使消息是顺序到达的也不能保证消息顺序消费。 综上所述RocketMQ要想实现顺序消息核心就是Producer同步发送确保一组顺序消息被发送到同一个分区队列然后Consumer确保同一个队列只被一个线程消费。 RocketMQ的特性 顺序消息是RocketMQ的特性之一它可以让Consumer消费消息的顺序严格按照消息的发送顺序来进行。顺序消息可以分为全局有序和分区有序绝大部分场景下分区有序就已经能够满足需求了。 全局有序某个Topic下所有的消息都是有序的所有消息按照严格的先进先出的顺序进行生产和消费要求Topic下只能有一个分区队列且Consumer只能有一个线程消费适用对性能要求不高的场景。分区有序某个Topic下所有消息根据ShardingKey进行分区相同ShardingKey的消息必须被发送到同一个分区队列因为队列本身是可以保证先进先出的此时只要保证Consumer同一个队列单线程消费即可。 RocketMQTemplate提供三种发送顺序消息的方式 syncSendOrderly()同步发送顺序消息asyncSendOrderly()异步发送顺序消息sendOneWayOrderly()单向发送顺序消息 源码解析 先了解一下相关的组建类 1、MessageQueueSelector 分区队列选择器它是一个接口只有一个select方法根据ShardingKey从Topic下所有的分区队列中选择一个目标队列进行消息发送必须确保相同ShardingKey选择的是同一个分区队列常见作法是对队列数取模。默认选择队列方式是计算 hashKey 的 hashCode值来选择队列。 Demo 首先消费者指定最大线程数为一这样才能确保每个队列只能被一个线程消费 当我们在发消息时可以通过指定相同的ShardingKey达到多条消息按照发送顺序发送到相同的队列。 /*** 顺序消息*/ Test public void sendOrderMsg() {for (int i 0; i 10; i) {rocketMQTemplate.syncSendOrderly(Demo:order, 同步发送顺序消息i, 0);}for (int i 0; i 10; i) {rocketMQTemplate.asyncSendOrderly(Demo:order, 异步发送顺序消息 i, 0 , new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {return;}Overridepublic void onException(Throwable e) {return;}});}for (int i 0; i 10; i) {rocketMQTemplate.sendOneWayOrderly(Demo:order, 单向发送顺序消息 i, 0 );}}或者我们也可以重写MessageQueueSelector类的select()方法自定义选择队列的规则 9. 事务消息 分布式消息选型的时候是否支持事务消息是一个很重要的考量点而目前只有RocketMQ对事务消息支持的最好。 Apache RocketMQ在4.3.0版中已经支持分布式事务消息这里RocketMQ采用了2PC的思想来实现了提交事务消息同时增加一个补偿逻辑来处理二阶段超时或者失败的消息如下图所示。 概要 RocketMQ实现事务消息主要分为两个阶段正常事务的发送及提交、事务信息的补偿流程 整体流程为 正常事务发送与提交阶段 生产者发送一个半消息给MQServer半消息是指消费者暂时不能消费的消息服务端响应消息写入结果半消息发送成功开始执行本地事务根据本地事务的执行状态执行Commit或者Rollback操作 事务信息的补偿流程补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。 如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求生产者收到确认回查请求后检查本地事务的执行状态根据检查后的结果执行Commit或者Rollback操作 RocketMQ是如何实现事务消息提交前消费者不可见呢 事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC 这样由于消费者没有订阅这个主题所以不会被消费。 如何处理第二阶段的失败消息 在本地事务执行完成后会向MQServer发送Commit或Rollback操作此时如果在发送消息的时候生产者出故障了那么要保证这条消息最终被消费MQServer会像服务端发送回查请求确认本地事务的执行状态。 当然了rocketmq并不会无休止的的信息事务状态回查默认回查15次如果15次回查还是无法得知事务状态RocketMQ默认回滚该消息。 事务消息状态有哪几种 TransactionStatus.CommitTransaction提交事务消息消费者可以消费此消息 TransactionStatus.RollbackTransaction回滚事务它代表该消息将被删除不允许被消费。 TransactionStatus.Unknown 未知状态它代表需要检查消息队列来确定状态。 Demo 首先编写一个Controller模拟发送10调事务消息并且依次指定key为110。 /*** 发送事务消息测试*/ GetMapping(/tx) public void sendTransactionMsg() {for (int i 1; i 10; i) {// 发送指定事务id的消息Message msg MessageBuilder.withPayload(事务消息 i).setHeader(RocketMQHeaders.KEYS, i).build();TransactionSendResult result rocketMQTemplate.sendMessageInTransaction(Demo:tx, msg, null);} }其次编写一个事务监听器类实现RocketMQLocalTransactionListener的 “执行本地事务” 方法 和 “检查事务状态” 方法。当执行完本地事务方法后消息的状态为UNKNOWN则会在指定次数下调用 “检查事务状态” 方法。 Slf4j Component RequiredArgsConstructor RocketMQTransactionListener public class TransactionListenerImpl implements RocketMQLocalTransactionListener {/*** 执行本地事务逻辑** param msg* param arg* return*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 模拟当key为偶数时提交事务为基数UNKNOWN在指定次数下检查当前事务状态MessageHeaders headers msg.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.PREFIX RocketMQHeaders.TRANSACTION_ID);String key (String) headers.get(RocketMQHeaders.PREFIX RocketMQHeaders.KEYS);log.info(执行本地事务,事务id:{},key:{}, transactionId, key);if (Integer.parseInt(key) % 2 0) {//执行成功可以提交事务return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.UNKNOWN;}}/*** 检查事务状态** param msg* return*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 模拟检查事务状态时回滚MessageHeaders headers msg.getHeaders();String transactionId (String) headers.get(RocketMQHeaders.PREFIX RocketMQHeaders.TRANSACTION_ID);String key (String) headers.get(RocketMQHeaders.PREFIX RocketMQHeaders.KEYS);log.info(检查本地事务,事务id:{},key:{}, transactionId, key);return RocketMQLocalTransactionState.ROLLBACK;} }当我们调用发送事务消息接口时发现每条消息都会执行本地事务。可以根据消费消息状况分析出事务消息根据我们所设定的key是偶数的情况下发送消息。 那么对于执行完本地事务后消息状态为 UNKNOWN未知的消息则会在指定次数以及指定时间间隔下检查事务状态。 源码解读 通过源码分析可知首先是向MqServe发送了事务消息并返回消息的状态。 若消息发送成功为其生成事务id并执行本地事务方法并返回事务消息状态。若消息发送失败则默认事务消息状态为回滚。 发送单向消息返回事务状态MqServe根据消息的事务状态执行相关的事务操作。 如果是 commit 会将消息投递到真实的 topic 中然后再投递一个表示删除的消息到 RMS_SYS_TRANS_HALF_TOPIC 中表示当前事务完成如果是 rollback则只需投递表示删除的消息即可如果是 TRANSACTION_NOT_TYPE则一段时间后再次及检查当检查的次数超过上限默认15次则丢弃消息
http://www.w-s-a.com/news/147/

相关文章:

  • 免费快速网站十八个免费的舆情网站