当前位置: 首页 > news >正文

2022年免费网站软件下载4399在线观看免费韩国

2022年免费网站软件下载,4399在线观看免费韩国,专业网站设计公司有哪些,上海的外贸网站建设公司价格文章目录 RocketMQ 介绍为什么要使用 MQ #xff1f;RocketMQ 与其他产品对比vs Kafkavs RabbitMQvs ActiveMQ RocketMQ 重要概念部署 Namesrver、Broker、Dashboard快速入门消息生产者消息消费者 消费模式简单消息1#xff09;同步发送2#xff09;异步发送3#xff09;单… 文章目录 RocketMQ 介绍为什么要使用 MQ RocketMQ 与其他产品对比vs Kafkavs RabbitMQvs ActiveMQ RocketMQ 重要概念部署 Namesrver、Broker、Dashboard快速入门消息生产者消息消费者 消费模式简单消息1同步发送2异步发送3单向发送 顺序消息顺序消息生产者顺序消息消费者 广播消息广播消息消费模式 延迟消息预定日程生产者预定日程消费者指定时间生产者指定时间消费者 RocketMQ 如何保证消息可靠性RocketMQ 如何解决消息积压问题 RocketMQ 介绍 RocketMQ是一款由阿里巴巴开源的分布式消息中间件。它具有低延迟、高吞吐量、高可用性和高可靠性等特点适用于构建具有海量消息堆积和异步解耦功能的应用系统。 为什么要使用 MQ 作用描述异步系统耦合度降低没有强依赖关系削峰不需要同步执行的远程调用可以有效提高响应时间解耦请求达到峰值后后端或者数据库还可以保持固定消费速率消费不会被压垮 RocketMQ 与其他产品对比 vs Kafka 数据可靠性RocketMQ 提供多种可靠性保证Kafka 在极端情况下可能丢失数据。实时性RocketMQ 在消息实时性上表现更佳。队列数与性能RocketMQ 支持更多队列Kafka 在高分区下性能下降。消息顺序性RocketMQ 支持严格顺序Kafka 可能产生乱序。生态Kafka 生态更丰富RocketMQ 与阿里技术栈集成好。 vs RabbitMQ 性能RocketMQ 在高并发和海量消息处理上表现更优。消息模型RabbitMQ 模型灵活RocketMQ 注重顺序和事务。适用场景RabbitMQ 适用于可靠消息传递RocketMQ 适用于高性能场景。 vs ActiveMQ 跨平台与持久化ActiveMQ 支持多种协议和数据库持久化RocketMQ 持久化机制高效。灵活性ActiveMQ 协议广泛RocketMQ 多语言SDK集成友好。社区与文档ActiveMQ 社区活跃度较低RocketMQ 开发活跃但社区成熟度不及 RabbitMQ。 总结RocketMQ 在数据可靠性、实时性、队列数与性能上具有优势适合高性能和顺序消息场景。 RocketMQ 重要概念 Producer消息的发送者生产者 发件人 Consumer消息接收者消费者 取件人 Broker暂时和传输消息的通道 快递 NameServer管理Broker的负责消息的存储和转发接收生产者产生的消息并持久化消息当用户发送的消息被发送到Broker时Broker会将消息转发到与之关联的Topic中以便让更多的接收者进行处理各个快递公司的管理机构相当于Broker的注册中心保留了broker的信息 监测快递是否健康 Queue队列消息存放的位置一个Broker中可以有多个队列 驿站 Topic主题消息的分类用于标识同一类业务逻辑的消息 取件码 ConsumerGroup消费者组RocketMQ 中承载多个消费行为一致的消费者负载均衡分组。和消费者不同消费者组是一个逻辑概念。 部署 Namesrver、Broker、Dashboard Docker部署RocketMQ5.x (单机部署配置参数详解不使用docker-compose直接部署)_rocketmq不推荐用docker部署-CSDN博客 快速入门 1创建一个基于 Maven 的 SpringBoot 项目并添加以下依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion5.1.0/version /dependency消息生产者 创建消息生产者producer并指定生产者组名指定Nameserver地址启动producer创建消息对象指定主题Topic、Tag和消息体发送消息关闭生产者 public class SyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(SyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Simple, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。Simple-Sync.getBytes(StandardCharsets.UTF_8) //消息体。);SendResult send producer.send(msg);System.out.printf(i .发送消息成功%s%n, send);}producer.shutdown();} }消息消费者 创建消费者comsumer、指定消费者组名指定Nameserver地址创建监听订阅主题Topic和Tag等处理消息启动消费者comsumer public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(SimplePushConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Simple,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(收到消息: %s%n , n);});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Consumer Started.%n);} }消费模式 MQ 的消费模式可以大致分为两种一种是推 Push一种是拉 Pull。 Push 是服务端主动推送消息给客户端优点是及时性较好但如果客户端没有做好流控旦服务端推送大量消息到客户端时就会导致客户端消息堆积甚至崩溃。 Pull 是客户端需要主动到服务端取数据优点是客户端可以依据自己的消费能力进行消费但拉取的频率也需要用户自己控制拉取频繁容易造成服务端和客户端的压力拉取间隔长又容易造成消费不及时。 Push 模式也是基于 pull 模式的只能客户端内部封装了 api一般场景下上游消息生产量小或者均速的时候选择 push 模式。在特殊场景下例如电商大促抢优惠券等场景可以选择 pull 模式 简单消息 1同步发送 可靠性要求高、数据量级少、实时响应具体实现参考上面的入门代码 2异步发送 不等待消息返回直接进入后续流程。 public class AsyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(AsyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();CountDownLatch countDownLatch new CountDownLatch(100);//计数for (int i 0; i 100; i) {Message message new Message(Simple, TagA, Simple-Async.getBytes(StandardCharsets.UTF_8));final int index i;producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%d 消息发送成功%s%n, index, sendResult);}Overridepublic void onException(Throwable throwable) {countDownLatch.countDown();System.out.printf(%d 消息失败%s%n, index, throwable);throwable.printStackTrace();}});}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();} }3单向发送 只负责发送不管消息是否发送成功。 public class OnewayProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(AsyncProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 10; i) {Message message new Message(Simple,TagA, Simple-Oneway.getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);System.out.printf(%d 消息发送完成 %n , i);}Thread.sleep(5000);producer.shutdown();} }顺序消息 顺序消息指生产者局部有序发送到一个queue但多个queue之间是全局无序的。 顺序消息生产者样例通过MessageQueueSelector将消息有序发送到同一个queue中。顺序消息消费者样例通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取通过加锁的方式实现。 顺序消息生产者 public class OrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(OrderProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int j 0; j 5; j) {for (int i 0; i 10; i) {Message message new Message(OrderTopic,TagA,(order_ j _step_ i).getBytes(StandardCharsets.UTF_8));SendResult sendResult producer.send(message, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {Integer id (Integer) o;int index id % list.size();return list.get(index);}}, j);System.out.printf(%s%n, sendResult);}}producer.shutdown();} }顺序消息消费者 public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(OrderConsumer);consumer.setNamesrvAddr(10.226.8.14:9876);consumer.subscribe(OrderTopic,*);consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {list.forEach(n-{System.out.println(QueueId:n.getQueueId() 收到消息内容 new String(n.getBody()));});return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);} }广播消息 广播消息并没有特定的消息消费者样例这是因为这涉及到消费者的集群消费模式。 MessageModel.BROADCASTING广播消息。一条消息会发给所有订阅了对应主题的消费者不管消费者是不是同一个消费者组。MessageModel.CLUSTERING集群消息。每一条消息只会被同一个消费者组中的一个实例消费。 广播消息消费模式 public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(BroadCastConsumer);consumer.setNamesrvAddr(10.226.8.14:9876);consumer.subscribe(simple,*);consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach(n-{System.out.println(QueueId:n.getQueueId() 收到消息内容 new String(n.getBody()));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Broadcast Consumer Started.%n);} }延迟消息 延迟消息实现的效果就是在调用producer.send方法后消息并不会立即发送出去而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。 message.setDelayTimeLevel(3)预定日常定时发送。1到18分别对应messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h可以在dashboard中broker配置查看。msg.setDelayTimeMs(10L)指定时间定时发送。默认支持最大延迟时间为3天可以根据broker配置timerMaxDelaySec修改。 预定日程生产者 public class ScheduleProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ScheduleProducer);producer.setNamesrvAddr(10.226.8.14:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Schedule, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。ScheduleProducer.getBytes(StandardCharsets.UTF_8) //消息体。);//1到18分别对应messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);producer.send(msg);System.out.printf(i .发送消息成功%s%n, LocalTime.now());}producer.shutdown();} }预定日程消费者 public class ScheduleConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(SimplePushConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Schedule,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(接收时间%s %n, LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Simple Consumer Started.%n);} }指定时间生产者 public class TimeProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(TimeProducer);producer.setNamesrvAddr(192.168.43.137:9876);producer.start();for (int i 0; i 2; i) {Message msg new Message(Schedule, //主题TagA, //设置消息Tag用于消费端根据指定Tag过滤消息。TimeProducer.getBytes(StandardCharsets.UTF_8) //消息体。);// 相对时间延时消息。此消息将在 10 秒后传递给消费者。msg.setDelayTimeMs(10000L);// 绝对时间定时消息。设置一个具体的时间然后在这个时间之后多久在进行发送消息// msg.setDeliverTimeMs(System.currentTimeMillis() 10000L);producer.send(msg);System.out.printf(i .发送消息成功%s%n, LocalTime.now());}producer.shutdown();} }指定时间消费者 public class TimeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer pushConsumer new DefaultMQPushConsumer(TimeConsumer);pushConsumer.setNamesrvAddr(10.226.8.14:9876);pushConsumer.subscribe(Schedule,*);pushConsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n-{System.out.printf(接收时间%s %n, LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf(Simple Consumer Started.%n);} }RocketMQ 如何保证消息可靠性 我们将消息流程分为三大部分每一部分都有可能会丢失数据。 生产阶段Producer 通过网络将消息发送给 Broker这个发送可能会发生丢失。比如网络延迟不可达等。存储阶段Broker 肯定是先把消息放到内存的然后根据刷盘策略持久化到硬盘中。刚收到 Producer 的消息放入内存但是异常宕机了导致消息丢失。消费阶段消费失败。比如先提交ack再消费处理过程中出现异常该消息就出现了丢失。 解决方案 生产阶段使用同步发送失败重试机制异步发送重写回调方法检查发送结果ACK 确认机制。存储阶段同步刷盘机制默认情况下是异步刷盘集群模式采用同步复制。消费阶段正常消费处理完成才提交ACK手动ACK如果处理异常返回重试标识。 RocketMQ 如何解决消息积压问题 增加消费者数量: 增加消费者实例的数量以提高消息的消费速度。 确保消费者实例数量与消息队列数量匹配以便每个队列都有专门的消费者处理。 优化消费者逻辑: 优化消费者的处理逻辑提高单个消费者的处理效率。 使用批量消费的方式来减少每次消费的开销。 扩展消息队列容量: 增加消息队列的数量以分散消息负载。 动态调整队列数量增加处理能力实现更高的并行处理。 设置消息消费失败处理机制: 实施重试机制确保消费失败的消息能够被重新处理。 设置死信队列DLQ来处理多次消费失败的消息。 快速失败丢弃消息: 如果某些消息可以丢弃考虑在高峰期快速丢弃这些消息以减轻负担。 提升系统性能: 优化服务器性能增加硬件资源如CPU、内存和网络带宽。 确保网络连接的稳定性和速度以减少延迟。
http://www.w-s-a.com/news/999201/

相关文章:

  • 网页设计网站作业外贸出口流程步骤
  • 成都网站推广公司排名网站建设首选 云端高科
  • 网站怎么增加流量找网络推广策畿
  • 网站描述范例做网站好看的旅行背景图片
  • 网上商城开发网站建设宣传网站设计
  • 免费的开源网站wordpress建站不好用
  • 陕西建设厅人才网站ai生成logo免费
  • 建设家居网站村建站什么部门
  • 网站建设+青海龙岗区网站建设
  • 精品课网站建设网络公司名字怎么取
  • 化工网站制作用户体验设计案例
  • 如何在微信公众平台上建立微网站垂直门户网站怎么做
  • 关于销售网站有哪些内容品牌网站建设小科6a蚪
  • 免费制作网站平台哪个好湖南企业建网站
  • 灞桥微网站建设株洲百姓网
  • 儿童网站建设互联网怎么学
  • 重庆建网站的公司集中在哪里中煤第五建设有限公司网站
  • 成都网站建设987net运维需要掌握哪些知识
  • 网站建设师个人简介怎么写WordPress头像美化插件
  • 网站优化知识销售管理系统c语言
  • 桂林市网站设计厦门自己建网站
  • 网站seo哪里做的好东莞做网站优化的公司
  • 休闲采摘园网站建设政务公开和网站建设工作的建议
  • 长沙网站建设哪个公司好PHP amp MySQL网站建设宝典
  • 代码编辑器做热点什么网站好湛江网站建设哪家好
  • php网站开发概念网站开发岗位职责任职责格
  • asp 网站源码 下载西安自适应网站建设
  • 白领兼职做网站贵阳网站设计哪家好
  • 热水器网站建设 中企动力企业网站开发需要多钱
  • 北京市建设工程信息网交易网站静态网页模板免费下载网站