重庆山艺网站建设,塘厦理工学校,手机大全,虚拟体验网站RocketMQ 是一款开源的分布式消息系统#xff0c;基于高可用分布式集群技术#xff0c;提供低延时的、高可靠的消息发布与订阅服务。同时#xff0c;广泛应用于多个领域#xff0c;包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即…RocketMQ 是一款开源的分布式消息系统基于高可用分布式集群技术提供低延时的、高可靠的消息发布与订阅服务。同时广泛应用于多个领域包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
「RocketMQ本质上的设计思路和Kafka类似」但是和Kafka不同的是其使用Java进行开发由于在国内的Java受众群体远远多于Scala、Erlang所以RocketMQ是很多以Java语言为主的公司的首选。同样的RocketMQ和Kafka都是Apache基金会中的顶级项目他们社区的活跃度都非常高项目更新迭代也非常快。
RocketMQ是阿里review kafka的java版如果消息性能要求高用 RocketMQ 与 Kafka 可以更优 消息队列在实际应用中常用的使用场景包含「应用解耦、异步处理、流量削锋、消息通讯、日志处理」等。
RocketMQ运行原理 上面是RocketMQ运行的一个大致流程图。
在对Apache RocketMQ进行深入探索的过程中我们首先需要理解其核心组件的作用
NameServer充当注册中心的角色主要任务是管理Broker节点.
Broker它是RocketMQ系统的核心部分主要负责消息的存储。
Producer这是消息的生成者它创建消息并将其写入Broker。
Consumer作为消息的接收者负责从Broker读取消息并进行处理。
下面让我们一起深入了解RocketMQ的运行流程
1. 首先Broker在启动后会向根据其配置向NameServer注册。
2. NameServer作为注册中心管理着我们的Broker集群信息以及Topic路由信息。例如一个特定的Topic具有哪些Broker主机以及队列信息。
3. 接着由特定的业务系统中的生产者Producer生成消息并发送到Broker的主节点。
4. 在Broker节点中这些消息将被保存到本地磁盘的CommitLog中以确保消息不会丢失。
5. 接下来主节点Broker将这些消息同步到从节点Broker这样可以实现负载均衡并增强系统的鲁棒性。
6. 最后业务系统中的消费者Consumer会从Broker中取出消息并进行处理这就完成了数据的完整生命周期。
我们的Producer写入消息前需要先选择Broker那Producer是如何选择Broker的呢 上面提到NameServer 在 RocketMQ 架构中起到了注册中心的作用它负责管理所有的 Broker 节点。每当 Broker 启动后它就会自动的注册到 NameServer 中并且会每隔30秒向 NameServer 发送一次心跳以证明它依然在运行。NameServer 则会每隔10秒检查一次各个 Broker 节点是否还在线如果有 Broker 在120秒内未发送心跳那么 NameServer 就会判断该 Broker 已经宕机进而将其从注册列表中移除。
在业务系统中Producer 在发送消息之前会先从 NameServer 中拉取需要的 topic 路由信息这些信息将包含目标 topic 各个 queue 的详细信息以及各 queue 分别存储在哪个 Broker 节点上。Producer 会将这些信息缓存到本地并依此信息通过一种负载均衡算法选择从哪个 queue 中读取数据以及找到该 queue 对应的 Broker 节点。
那么如果某个 Broker 在 Producer 准备写入数据的时候突然宕机了又该如何处理呢
RocketMQ 设计了一套故障探测与处理机制。如果某个 Broker 宕机了那么 Producer 进行写入操作时将会失败此时它会发起重试操作并从可用的 Broker 列表中重新选择一个进行写入。并且为避免持续向故障节点写入数据Producer 会采取一种称为故障退避的策略即在一段时间内停止向该 Broker 发送数据。值得注意的是Broker 的故障并不会立即被 Producer 和 NameServer 感知这样做是为了降低 NameServer 处理逻辑的复杂性。当 Broker 宕机后由于本地的 topic 路由缓存并未更新Producer 仍可能尝试向故障的 Broker 发送数据然后备受失败并重试。只有当 NameServer 在检查心跳时发现该 Broker 已宕机并从注册列表中移除后Producer 在刷新本地缓存时才会真正地感知到该 Broker 的宕机。
当我们的Producer基于负载均衡选择了Broker节点它的消息是如何写入的呢 在深入理解 RocketMQ 的存储机制时我们需要知道Producer 在写入消息时默认会优先写到操作系统管理的 pageCache这个过程是异步的只要消息被写入 pageCache写入操作就被认为是成功的。这种异步的处理方式极大地提高了 RocketMQ 的写入效率。
当消息被写入 pageCache 后将有一个后台线程异步地将这些消息从 pageCache 刷入到磁盘文件 CommitLog 中CommitLog 是消息的实际存储位置。同时还会有一个专门的线程负责将 CommitLog 中的消息位置物理偏移量写入到 ConsumeQueue 中。
那么客户端如何读取存储在 CommitLog 中的消息呢
当 Consumer 端的消费者需要读取消息时它会先到ConsumeQueue然后根据在 ConsumeQueue 中存储的 offset 信息找到 CommitLog 中的实际数据进行读取。
这样的存储方法是否可以支持高并发模式的写入呢
当系统面临大量同时写入和读取的请求时可能会遇到一种情况即大量的读取请求通过 ConsumeQueue 去找 CommitLog 中的数据但是此时数据可能还在 pageCache 中并未完成异步写入。这时系统会通过 CommitLog 和 PageCache 的映射找到 pageCache 中的消息进行读取。也就是说大量的读取和写入请求都对 pageCache 进行操作。但是当并发量过高时可能会出现 Broker busy 的异常这是因为在极高的并发场景下持续大量的读写操作可能会对系统性能造成影响。
简而言之RocketMQ 的存储机制旨在为高并发高效的读写提供支持但是在一些极端情况下仍然需要额外的优化措施以提高稳定性和性能。
当并发量非常高时出现Broker busy异常了如何解决 RocketMQ 在面对高并发场景时为了改善 Broker busy 异常和提高吞吐量可以启用 transientStorePool 机制。这种机制的实现方式是Broker 在写入消息时将消息直接写入由 JVM 管理的 offheap 堆外内存这样的设计能有效提升并发性能。
那么为什么启用 transientStorePool 可以提高并发处理能力呢
当开启该机制后消息首先写入 JVM 的 offheap内存然后异步刷新到 pageCache最终由 pageCache 异步刷新到 CommitLog。大量的写请求将向 JVM 的 heap 内存进行而大量的读请求仍然从 pageCache 进行这种读写分离的机制极大地提高了 RocketMQ 的并发性能。
但是为什么 transientStorePool 机制不作为默认机制呢
虽然 transientStorePool 能显著提升并发性能但其也存在风险。当消息写入到 JVM 管理的 offheap 堆外内存后如果 JVM 进程重启或者宕机那些尚未被及时落盘的消息就会丢失。但如果采用默认的写入方法即先写入操作系统管理的 pageCache那么在 JVM 进程重启后那些保存在 pageCache 中的信息不会丢失只有当整个服务器宕机重启时pageCache 中的消息才有可能丢失。因此数据最安全的处理方式是将其直接写入到 CommitLog。
总结开启 transientStorePool 机制可以极大地提高 RocketMQ 的并发处理能力然而这可能会带来数据的丢失。因此它更适合那些并发处理能力要求高、且可以接受部分数据丢失的场景。
如果我们想要写入数据不丢失应该怎么处理 在设计与金融场景以及其他要求数据不能丢失的环境中我们会采用同步方式将数据写入CommitLog。成功执行写入操作后才返回确保了数据的完整性和安全性。只有在broker的物理存储设备出现故障的情况下才有可能导致数据丢失。为了提供进一步提高数据的安全性也可以通过多台服务器进行数据备份。
但值得注意的是, 尽管实现了对数据的安全性提升, 使用同步写入CommitLog方式会降低系统的性能到几个数量级。
Rocketmq如何支持分布式事务消息 场景
A存在DB操作、B存在DB操作两方需要保证分布式事务一致性通过引入中间层MQ
A和MQ保持事务一致性异常情况下通过MQ反查A接口实现check
B和MQ保证事务一致通过重试从而达到最终事务一致性。
「原理大事务 小事务 异步」
「1.MQ与DB一致性原理两方事务」
流程图 上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。
「MQ消息、DB操作一致性方案」
1发送消息到MQ服务器此时消息状态为SEND_OK。此消息为consumer不可见。
2执行DB操作DB执行成功Commit DB操作DB执行失败Rollback DB操作。
3如果DB执行成功回复MQ服务器将状态为COMMIT_MESSAGE如果DB执行失败回复MQ服务器将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。
4MQ内部提供一个名为“事务状态服务”的服务此服务会检查事务消息的状态如果发现消息未COMMIT则通过Producer启动时注册的TransactionCheckListener来回调业务系统业务系统在checkLocalTransactionState方法中检查DB事务状态如果成功则回复COMMIT_MESSAGE否则回复ROLLBACK_MESSAGE。
「说明」 上面以DB为例其实此处可以是任何业务或者数据源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE 均是client jar提供的状态在MQ服务器内部是一个数字。
TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调上图中灰色部分。这种消息丢失只存在于断网或者RocketMQ集群挂了的情况下。当RocketMQ集群挂了如果采用异步刷盘存在1s内数据丢失风险异步刷盘场景下保障事务没有意义。所以如果要核心业务用RocketMQ解决分布式事务问题建议选择同步刷盘模式。
「2.多系统之间数据一致性多方事务」 当需要保证多方超过2方的分布式一致性上面的两方事务一致性通过RocketMQ的事务性消息解决已经无法支持。这个时候需要引入TCC模式思想Try-Confirm-Cancel不清楚的自行百度。
「以上图交易系统为例」 1交易系统创建订单往DB插入一条记录同时发送订单创建消息。通过RocketMQ事务性消息保证一致性
2接着执行完成订单所需的同步核心RPC服务非核心的系统通过监听MQ消息自行处理处理结果不会影响交易状态。执行成功更改订单状态同时发送MQ消息。
3交易系统接受自己发送的订单创建消息通过定时调度系统创建延时回滚任务或者使用RocketMQ的重试功能设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下消息第一次到达延迟为1ms左右这时可能RPC还未执行完订单状态还未设置为完成第二次消费时间可以指定。延迟任务先通过查询订单状态判断订单是否完成完成则不创建回滚任务否则创建。PS多个RPC可以创建一个回滚任务通过一个消费组接受一次消息就可以也可以通过创建多个消费组一个消息消费多次每次消费创建一个RPC的回滚任务。回滚任务失败通过MQ的重发来重试。
以上是交易系统和其他系统之间保持最终一致性的解决方案。
「3.案例分析」
「1) 单机环境下的事务示意图」
如下为A给B转账的例子。如下为A给B转账的例子。 以上过程在代码层面甚至可以简化到在一个事物中执行两条sql语句。
「2) 分布式环境下事务」 和单机事务不同A、B账户可能不在同一个DB中此时无法像在单机情况下使用事务来实现。
此时可以通过以下方式实现将转账操作分成两个操作。
a) A账户 b) MQ消息
A账户数据发生变化时发送MQ消息MQ服务器将消息推送给转账系统转账系统来给B账号加钱。
c) B账户 顺序消息
RocketMq有3种消息类型 普通消费 顺序消费 事务消费
顺序消费场景在网购的时候我们需要下单那么下单需要假如有三个顺序
第一创建订单 第二订单付款 第三订单完成
也就是这三个环节要有顺序这个订单才有意义RocketMQ可以保证顺序消费。
「RocketMQ 实现顺序消费的原理」produce在发送消息的时候把消息发到同一个队列queue中消费者注册消息监听器为MessageListenerOrderly这样就可以保证消费端只有一个线程去消费消息
「注意是把把消息发到同一个队列queue不是同一个topic默认情况下一个topic包括4个queue」
「1. 顺序消息缺陷」
发送顺序消息无法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题个别队列由于哈希不均导致消息过多消费速度跟不上产生消息堆积问题遇到消息失败的消息无法跳过当前队列消费暂停。
「2. 原理」
produce在发送消息的时候把消息发到同一个队列queue中消费者注册消息监听器为MessageListenerOrderly这样就可以保证消费端只有一个线程去消费消息。
「3. 扩展」
可以通过实现发送消息的队列选择器方法实现部分顺序消息。
举例比如一个数据库通过MQ来同步只需要保证每个表的数据是同步的就可以。解析binlog将表名作为队列选择器的参数这样就可以保证每个表的数据到同一个对列里面从而保证表数据的顺序消费