wdcp 快速迁移网站,网站开发应看什么书籍,文章资讯类网站模板,wordpress模板详解RocketMQ
一、MQ概述
Message Queue#xff0c;是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。
MQ的作用
限流削峰#xff1a;当用户发送超量请求时#xff0c;将请求暂存#xff0c;以便后期慢慢处理。如果不使用MQ暂存直接请求到…RocketMQ
一、MQ概述
Message Queue是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。
MQ的作用
限流削峰当用户发送超量请求时将请求暂存以便后期慢慢处理。如果不使用MQ暂存直接请求到业务系统中容易引起系统崩溃。异步解耦若上游系统和下游系统为同步调用会大大降低系统的吞吐量和并发量。MQ层实现两个系统之间的异步调用数据收集分布式系统会产生海量数据流如业务日志、监控数据、用户行为。针对这些数据流采集汇总进行大数据分析。
主流应用的MQ产品
KafkaScala/Java语言开发。特点是高吞吐量但会丢数据常用与大数据领域的实时计算、日志采集等场景。不遵循任何MQ协议使用自研协议。RocketMQJava语言开发。经过数年阿里双十一考验性能与稳定性非常高功能全面。不遵循任何MQ协议使用自研协议。开源版不如云上版阿里商业版
MQ常见协议 JMSJava Messaging Service。Java平台上有关MOMMessage Orientated Middleware的技术规范。他便于Java应用程序的消息交换提供标准的接口简化开发。ActiveMQ时典型实现 STOMPStreaming Text Orientated Message Protocol。是一种MOM的简单文本协议。STOMP提供一个可互操作的连接格式允许 客户端与任意STOMP消息代理进行交互。ActiveMQ时典型实现 AMQPAdvanced Message Queuing Protocol。一个提供统一消息服务的应用层标准是应用层协议的一个开放标准。RabbitMQ是典型实现 MQTTMessage Queueing Telemetry Transport。IBM开发的一个即时通讯协议二进制协议主要用于服务器和低功耗IoT设备之间的通信
二、基本概念
主题Topic表示一类消息的集合可以理解为消息的类型每个消息只能属于一个主题是RocketMQ进行消息订阅的基本单位。一个生产者可以同时发送多种Topic消息而一个消费者只能接收一种Topic消息
标签Tag用于快速过滤消息
三、Linux部署RocketMQ服务
1、在官网下载编译好的二进制压缩包版本5.0.0即可上传到Linux中
2、进行解压
3、配置环境变量ROCKETMQ_HOME和NAMESRV_ADDR 4、配置bin目录下的runserver.sh根据实际情况修改JVM的内存参数 5、配置bin目录下的runbroker.sh根据实际情况修改JVM的内存参数 6、执行nohup命令后台运行RocketMQ服务nameserver必须先启动broker需要再nameserver上注册
# 启动nameserver
nohup bin/mqnamesrv # 启动broker
nohup bin/mqbroker -c [confFile] # -c可指定加载的配置文件默认为conf/broker.conf# 查看日志rocketmq是否成功启动
tail nohup.out # 查看进程
jps # 停止broker
sh bin/mqshutdown broker# 停止namesrv
sh bin/mqshutdown namesrv7、执行命令测试rocketmq提供的测试样例生产者会发送一千条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer8、执行命令测试rocketmq提供的测试样例消费者会接受一千条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer四、RocketMQ API
生产者同步发送消息
public void test_SyncProducer() throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(producer_group_name);//设置注册服务的ip地址的端口producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);//启动生产者producer.start();for(int i0; i3; i){try {// 封装消息设置topictag用于消息快速过滤消息数据Message message new Message(TopicTest,TagA,ID04287777,(Hello, RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));//同步发送消息并获取发送结果producer从broker获取发送结果SendResult sendResult producer.send(message);System.out.println(sendResult);Thread.sleep(1500);} catch (Exception e) {throw new RuntimeException(e);}}producer.shutdown();
}生产者异步发送消息
public void test_AsyncProducer() throws Exception{DefaultMQProducer producer new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount 10;final CountDownLatch countDownLatch new CountDownLatch(messageCount);for(int i0; imessageCount; i){final int index i;// 封装消息设置topictag用于消息快速过滤消息数据Message message new Message(TopicTest,TagA,ID04287777,(Hello, RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息若broker有响应会调用SendCallback中的方法producer.send(message, new SendCallback() {public void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println( Send Message index OK: sendResult);}public void onException(Throwable throwable) {countDownLatch.countDown();System.out.println( Send Message index Exception: throwable);}});//单向发送producer.sendOneway(message);System.out.println(Message index send done);}//在100条消息发送完后关闭countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();
}生产者单向发送消息
public void test_OneWayProducer() throws Exception{DefaultMQProducer producer new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount 10;final CountDownLatch countDownLatch new CountDownLatch(messageCount);for(int i0; imessageCount; i){final int index i;// 封装消息设置topictag用于消息快速过滤消息数据Message message new Message(TopicTest,TagA,ID04287777,(Hello, RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));//单向发送producer.sendOneway(message);System.out.println(Message index send done);}//在100条消息发送完后关闭countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();
}消费者推模式
public static void test_PushConsumer() throws Exception{DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer_group_name);consumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//消费者订阅的消息topic和tagsubExpression*表示任意consumer.subscribe(TopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Receive New Message : list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println(Consumer Start...);
}消费者拉模式
不同于推模式消费者拉模式下需要手动管理消息队列MessageQueue和偏移量offset的映射关系。但是最新的LitePullConsumer底层源码已经实现对mq和offset的管理比较方便。
//拉模式消费者
public static void test_LitePullConsumer() throws Exception{DefaultLitePullConsumer litePullConsumer new DefaultLitePullConsumer(RocketMQConstant.CONSUMER_GROUP_NAME);litePullConsumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);litePullConsumer.subscribe(TopicTest, *);litePullConsumer.start();try {while(true){ListMessageExt messageExts litePullConsumer.poll();System.out.printf(%s%n, messageExts);}}finally {litePullConsumer.shutdown();}
}RocketMQ传递对象对象所属类需要实现序列化接口并且将对象转换为字节数组存入消息体中。
顺序消息
保证消息的局部有序其中几条消息的有序不一定是全部消息都要有序以防止受到网络传输的影响。
实现原理
生产者将一组有序的消息一次发到同一个MessageQueue中依靠队列的特点保证局部有序性。消费者消费完一个MessageQueue的消息后才会去消费下一个MessageQueue的消息。
public class OrderProducer {public static void main(String[] args) {DefaultMQProducer producer new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);try {producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for(int i0; i5; i){//用于指定顺序的idint orderId i;for(int j0; j5; j){Message message new Message(WanfengConstant.ORDER_TOPIC,order_orderId,KEYorderId,(order_orderId step j).getBytes(RemotingHelper.DEFAULT_CHARSET));//实现消息队列选择器对象使同一个orderId的消息发送到同一个消息队列SendResult sendResult producer.send(message,new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}},orderId);System.out.printf(%s%n, sendResult);}}}catch(Exception e){e.printStackTrace();producer.shutdown();}}
}public class OrderConsumer {public static void main(String[] args) {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);consumer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);try {consumer.subscribe(WanfengConstant.ORDER_TOPIC, *);//实现顺序消息监听者接口consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for(MessageExt messageExt : msgs){System.out.println(Receive Message: new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer Start...);} catch (Exception e) {e.printStackTrace();consumer.shutdown();}}
}广播消息
生产者发送的消息推送给所有group的消费者
实现原理将消费者设置MessageModel为广播模式。
public class BroadcastConsumer {public static void main(String[] args) {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设定消息模式为广播consumer.setMessageModel(MessageModel.BROADCASTING);try {consumer.subscribe(WanfengConstant.ARCHIVE_TOPIC, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt - {Archive archive (Archive) WanfengObjectUtil.bytesToObject(messageExt.getBody());System.out.println(Receive Message : archive.getId());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println(Broadcast Consumer Start...);}catch (Exception e){e.printStackTrace();consumer.shutdown();}}
}若指定MessageModel为CLUSTERING则生产者发送的消息会随机指定消费者消费。
延迟消息
顾名思义就是消息发送到broker时延迟指定的时间后再发送给消费者。常用于定时发送
过滤消息
过滤消息通过tag实现在消费者端指定过滤的tag即可。
//消费者订阅tag1或tag2的消息
consumer.subscribe(TopicTest, tag1 || tag2);在RocketMQ中消费者指定过滤条件后将其上推到Broker中在Broker中进行tag过滤以减少网络IO但同时也增加了Broker的繁忙。
事务消息 public class TransactionProducer {public static void main(String[] args) {TransactionMQProducer producer new TransactionMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);TransactionListener transactionListener new TransactionListener() {Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println([WANFENG-INFO] TransactionProducer.executeLocalTransaction(): 执行成功...);String tags msg.getTags();if (StringUtils.contains(tags, TagA)) {//消息提交发送出去return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.contains(tags, TagB)) {//消息回滚丢掉消息return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println([WANFENG-INFO] TransactionProducer.checkLocalTransaction(): 执行成功...);String tags msg.getTags();if (StringUtils.contains(tags, TagC)) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}};ExecutorService executorService new ThreadPoolExecutor(2,5,100, TimeUnit.SECONDS,new ArrayBlockingQueue(3));producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);try {producer.start();} catch (Exception e) {e.printStackTrace();}String[] tags new String[]{TagA, TagB, TagC};CountDownLatch countDownLatch new CountDownLatch(9);for (int i 0; i 9; i) {try {Message message new Message(TopicTest, tags[i % tags.length], Key i, (Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(message, null);System.out.println(sendResult);Thread.sleep(1000);countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {try {Thread.sleep(100000);} catch (InterruptedException e) {throw new RuntimeException(e);}producer.shutdown();}}
}ACL权限控制
ACL对用户对Topic资源的访问权限进行控制
在pom依赖中引入acl的依赖包
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-acl/artifactIdversion5.0.0/version
/dependency在服务端的conf/broker.conf文件添加配置开启acl
aclEnabletrue在服务端的conf/plain_acl.yml文件配置具体权限规则热加载不需要重启mq
accounts:- accessKey: RocketMQ #用户名secretKey: 12345678 #密码whiteRemoteAddress: #访问地址白名单admin: false #是否为管理员管理员可以访问所有TopicdefaultTopicPerm: DENY #默认Topic访问权限defaultGroupPerm: SUB #默认组权限topicPerms: #Topic对应的权限若这里找不到则采用defaultTopicPerm- topicADENY - topicBPUB|SUB- topicCSUBgroupPerms:# the group should convert to retry topic- groupADENY- groupBPUB|SUB- groupCSUB
在创建生产者对象时需加入RPCHookacl的用户信息
public class AclProducer {private static final String ACL_ACCESS_KEY RocketMQ;private static final String ACL_SECRET_KEY 12345678;/*** 通过用户名和密码获取RPCHook* return*/public static RPCHook getAclRPCHook(){return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));}public static void main(String[] args) throws MQClientException, InterruptedException {//创建生产者时加入用户信息即RPCHookDefaultMQProducer producer new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, getAclRPCHook());producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for (int i 0; i 20; i) {try {Message message new Message(TopicTest,WanfengConstant.TAGS_NAME,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/);SendResult sendResult producer.send(message);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}}
}消息轨迹
ProducerConsumerBroker处理消息的相关信息
消息轨迹的实现原理是MQ把消息轨迹都往RMQ_SYS_TRACE_TOPIC的Topic中放
在Broker端配置文件开启消息轨迹
traceTopicEnabletrue创建生产者时指定enableMsgTrace参数为true开启消息轨迹。也可以指定customizedTraceTopic参数来自定义消息轨迹的Topic。
public class TraceProducer {public static void main(String[] args) throws MQClientException {//指定enableMsgTrace参数为true开启消息轨迹DefaultMQProducer producer new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, true);producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);producer.start();for (int i 0; i 20; i) {try {Message message new Message(TopicTest,WanfengConstant.TAGS_NAME,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/);SendResult sendResult producer.send(message);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();}}}
}