2022年免费网站软件下载,4399在线观看免费韩国,专业网站设计公司有哪些,上海的外贸网站建设公司价格文章目录 RocketMQ 介绍为什么要使用 MQ #xff1f;RocketMQ 与其他产品对比vs Kafkavs RabbitMQvs ActiveMQ RocketMQ 重要概念部署 Namesrver、Broker、Dashboard快速入门消息生产者消息消费者 消费模式简单消息1#xff09;同步发送2#xff09;异步发送3#xff09;单… 文章目录 RocketMQ 介绍为什么要使用 MQ RocketMQ 与其他产品对比vs Kafkavs RabbitMQvs ActiveMQ RocketMQ 重要概念部署 Namesrver、Broker、Dashboard快速入门消息生产者消息消费者 消费模式简单消息1同步发送2异步发送3单向发送 顺序消息顺序消息生产者顺序消息消费者 广播消息广播消息消费模式 延迟消息预定日程生产者预定日程消费者指定时间生产者指定时间消费者 RocketMQ 如何保证消息可靠性RocketMQ 如何解决消息积压问题 RocketMQ 介绍
RocketMQ是一款由阿里巴巴开源的分布式消息中间件。它具有低延迟、高吞吐量、高可用性和高可靠性等特点适用于构建具有海量消息堆积和异步解耦功能的应用系统。
为什么要使用 MQ
作用描述异步系统耦合度降低没有强依赖关系削峰不需要同步执行的远程调用可以有效提高响应时间解耦请求达到峰值后后端或者数据库还可以保持固定消费速率消费不会被压垮
RocketMQ 与其他产品对比
vs Kafka
数据可靠性RocketMQ 提供多种可靠性保证Kafka 在极端情况下可能丢失数据。实时性RocketMQ 在消息实时性上表现更佳。队列数与性能RocketMQ 支持更多队列Kafka 在高分区下性能下降。消息顺序性RocketMQ 支持严格顺序Kafka 可能产生乱序。生态Kafka 生态更丰富RocketMQ 与阿里技术栈集成好。
vs RabbitMQ
性能RocketMQ 在高并发和海量消息处理上表现更优。消息模型RabbitMQ 模型灵活RocketMQ 注重顺序和事务。适用场景RabbitMQ 适用于可靠消息传递RocketMQ 适用于高性能场景。
vs ActiveMQ
跨平台与持久化ActiveMQ 支持多种协议和数据库持久化RocketMQ 持久化机制高效。灵活性ActiveMQ 协议广泛RocketMQ 多语言SDK集成友好。社区与文档ActiveMQ 社区活跃度较低RocketMQ 开发活跃但社区成熟度不及 RabbitMQ。
总结RocketMQ 在数据可靠性、实时性、队列数与性能上具有优势适合高性能和顺序消息场景。
RocketMQ 重要概念
Producer消息的发送者生产者 发件人
Consumer消息接收者消费者 取件人
Broker暂时和传输消息的通道 快递
NameServer管理Broker的负责消息的存储和转发接收生产者产生的消息并持久化消息当用户发送的消息被发送到Broker时Broker会将消息转发到与之关联的Topic中以便让更多的接收者进行处理各个快递公司的管理机构相当于Broker的注册中心保留了broker的信息 监测快递是否健康
Queue队列消息存放的位置一个Broker中可以有多个队列 驿站
Topic主题消息的分类用于标识同一类业务逻辑的消息 取件码
ConsumerGroup消费者组RocketMQ 中承载多个消费行为一致的消费者负载均衡分组。和消费者不同消费者组是一个逻辑概念。
部署 Namesrver、Broker、Dashboard
Docker部署RocketMQ5.x (单机部署配置参数详解不使用docker-compose直接部署)_rocketmq不推荐用docker部署-CSDN博客
快速入门
1创建一个基于 Maven 的 SpringBoot 项目并添加以下依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion5.1.0/version
/dependency消息生产者
创建消息生产者producer并指定生产者组名指定Nameserver地址启动producer创建消息对象指定主题Topic、Tag和消息体发送消息关闭生产者
public class SyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(SyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Simple, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。Simple-Sync.getBytes(StandardCharsets.UTF_8) //消息体。);SendResult send producer.send(msg);System.out.printf(i .发送消息成功%s%n, send);}producer.shutdown();}
}消息消费者
创建消费者comsumer、指定消费者组名指定Nameserver地址创建监听订阅主题Topic和Tag等处理消息启动消费者comsumer
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(SimplePushConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Simple,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(收到消息: %s%n , n);});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Consumer Started.%n);}
}消费模式
MQ 的消费模式可以大致分为两种一种是推 Push一种是拉 Pull。
Push 是服务端主动推送消息给客户端优点是及时性较好但如果客户端没有做好流控旦服务端推送大量消息到客户端时就会导致客户端消息堆积甚至崩溃。
Pull 是客户端需要主动到服务端取数据优点是客户端可以依据自己的消费能力进行消费但拉取的频率也需要用户自己控制拉取频繁容易造成服务端和客户端的压力拉取间隔长又容易造成消费不及时。
Push 模式也是基于 pull 模式的只能客户端内部封装了 api一般场景下上游消息生产量小或者均速的时候选择 push 模式。在特殊场景下例如电商大促抢优惠券等场景可以选择 pull 模式
简单消息
1同步发送
可靠性要求高、数据量级少、实时响应具体实现参考上面的入门代码
2异步发送
不等待消息返回直接进入后续流程。
public class AsyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(AsyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();CountDownLatch countDownLatch new CountDownLatch(100);//计数for (int i 0; i 100; i) {Message message new Message(Simple, TagA, Simple-Async.getBytes(StandardCharsets.UTF_8));final int index i;producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%d 消息发送成功%s%n, index, sendResult);}Overridepublic void onException(Throwable throwable) {countDownLatch.countDown();System.out.printf(%d 消息失败%s%n, index, throwable);throwable.printStackTrace();}});}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}3单向发送
只负责发送不管消息是否发送成功。
public class OnewayProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(AsyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 10; i) {Message message new Message(Simple,TagA, Simple-Oneway.getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);System.out.printf(%d 消息发送完成 %n , i);}Thread.sleep(5000);producer.shutdown();}
}顺序消息
顺序消息指生产者局部有序发送到一个queue但多个queue之间是全局无序的。
顺序消息生产者样例通过MessageQueueSelector将消息有序发送到同一个queue中。顺序消息消费者样例通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取通过加锁的方式实现。
顺序消息生产者
public class OrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(OrderProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int j 0; j 5; j) {for (int i 0; i 10; i) {Message message new Message(OrderTopic,TagA,(order_ j _step_ i).getBytes(StandardCharsets.UTF_8));SendResult sendResult producer.send(message, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {Integer id (Integer) o;int index id % list.size();return list.get(index);}}, j);System.out.printf(%s%n, sendResult);}}producer.shutdown();}
}顺序消息消费者
public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(OrderConsumer);consumer.setNamesrvAddr(10.226.8.14:9876);consumer.subscribe(OrderTopic,*);consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {list.forEach(n-{System.out.println(QueueId:n.getQueueId() 收到消息内容 new String(n.getBody()));});return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}广播消息
广播消息并没有特定的消息消费者样例这是因为这涉及到消费者的集群消费模式。
MessageModel.BROADCASTING广播消息。一条消息会发给所有订阅了对应主题的消费者不管消费者是不是同一个消费者组。MessageModel.CLUSTERING集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
广播消息消费模式
public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(BroadCastConsumer);consumer.setNamesrvAddr(10.226.8.14:9876);consumer.subscribe(simple,*);consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach(n-{System.out.println(QueueId:n.getQueueId() 收到消息内容 new String(n.getBody()));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Broadcast Consumer Started.%n);}
}延迟消息
延迟消息实现的效果就是在调用producer.send方法后消息并不会立即发送出去而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
message.setDelayTimeLevel(3)预定日常定时发送。1到18分别对应messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h可以在dashboard中broker配置查看。msg.setDelayTimeMs(10L)指定时间定时发送。默认支持最大延迟时间为3天可以根据broker配置timerMaxDelaySec修改。
预定日程生产者
public class ScheduleProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ScheduleProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Schedule, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。ScheduleProducer.getBytes(StandardCharsets.UTF_8) //消息体。);//1到18分别对应messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);producer.send(msg);System.out.printf(i .发送消息成功%s%n, LocalTime.now());}producer.shutdown();}
}预定日程消费者
public class ScheduleConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(SimplePushConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Schedule,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(接收时间%s %n, LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Simple Consumer Started.%n);}
}指定时间生产者
public class TimeProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(TimeProducer);producer.setNamesrvAddr(192.168.43.137:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Schedule, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。TimeProducer.getBytes(StandardCharsets.UTF_8) //消息体。);// 相对时间延时消息。此消息将在 10 秒后传递给消费者。msg.setDelayTimeMs(10000L);// 绝对时间定时消息。设置一个具体的时间然后在这个时间之后多久在进行发送消息// msg.setDeliverTimeMs(System.currentTimeMillis() 10000L);producer.send(msg);System.out.printf(i .发送消息成功%s%n, LocalTime.now());}producer.shutdown();}
}指定时间消费者
public class TimeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(TimeConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Schedule,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(接收时间%s %n, LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Simple Consumer Started.%n);}
}RocketMQ 如何保证消息可靠性
我们将消息流程分为三大部分每一部分都有可能会丢失数据。
生产阶段Producer 通过网络将消息发送给 Broker这个发送可能会发生丢失。比如网络延迟不可达等。存储阶段Broker 肯定是先把消息放到内存的然后根据刷盘策略持久化到硬盘中。刚收到 Producer 的消息放入内存但是异常宕机了导致消息丢失。消费阶段消费失败。比如先提交ack再消费处理过程中出现异常该消息就出现了丢失。
解决方案
生产阶段使用同步发送失败重试机制异步发送重写回调方法检查发送结果ACK 确认机制。存储阶段同步刷盘机制默认情况下是异步刷盘集群模式采用同步复制。消费阶段正常消费处理完成才提交ACK手动ACK如果处理异常返回重试标识。
RocketMQ 如何解决消息积压问题 增加消费者数量: 增加消费者实例的数量以提高消息的消费速度。 确保消费者实例数量与消息队列数量匹配以便每个队列都有专门的消费者处理。 优化消费者逻辑: 优化消费者的处理逻辑提高单个消费者的处理效率。 使用批量消费的方式来减少每次消费的开销。 扩展消息队列容量: 增加消息队列的数量以分散消息负载。 动态调整队列数量增加处理能力实现更高的并行处理。 设置消息消费失败处理机制: 实施重试机制确保消费失败的消息能够被重新处理。 设置死信队列DLQ来处理多次消费失败的消息。 快速失败丢弃消息: 如果某些消息可以丢弃考虑在高峰期快速丢弃这些消息以减轻负担。 提升系统性能: 优化服务器性能增加硬件资源如CPU、内存和网络带宽。 确保网络连接的稳定性和速度以减少延迟。