网站建设与管理卷子,北京建站优化公司,上海网站建设技术托管,Wordpress请求接口数据前言#xff1a; RocketMQ阿里开源的#xff0c;一款分布式的消息中间件#xff0c;它经过阿里的生产环境的高并发、高吞吐的考验#xff0c;同时#xff0c;还支持分布式事务等场景。RocketMQ使用Java语言进行开发#xff0c;方便Java开发者学习源码。但是#xff0c;R…前言 RocketMQ阿里开源的一款分布式的消息中间件它经过阿里的生产环境的高并发、高吞吐的考验同时还支持分布式事务等场景。RocketMQ使用Java语言进行开发方便Java开发者学习源码。但是RocketMQ设计相对复杂官方文档不是很完善不太适合中小公司引用。技多不压身作为一个好的Coder应该多学习一下优秀的框架。本篇主要介绍一下RocketMQ的基础用法
正文 这里我主要通过代码来介绍一下RocketMQ的使用首先介绍一下RocketMQ的原生写法然后介绍基于Springboot体系下RocketMQ生产者的消息发送、消费者的消息接受等写法。
一、Java原生的消息发送与接收写法
1. 生产者
/*** 〈一句话功能简述〉br* 〈〉** author hanxinghua* create 2022/9/30* since 1.0.0*/
Slf4j
public class Producer {public static void main(String[] args) throws Exception {// 实例化消息生产者DefaultMQProducer producer new DefaultMQProducer(RocketConstant.ORIGINAL_PRODUCER_GROUP);// 设置NameServer的地址producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 启动Producer实例producer.start();// 创建消息设置 Topic、Tag、keys、flag、消息体等Message message new Message(RocketConstant.ORIGINAL_TOPIC, (Hello RocketMQ).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息到一个BrokerSendResult sendResult producer.send(message);// 通过sendResult返回消息是否成功送达log.info(消息发送成功:{}, sendResult);// 如果不再发送消息关闭Producer实例。producer.shutdown();}}2. 消费者
/*** 〈一句话功能简述〉br* 〈〉** author hanxinghua* create 2022/9/30* since 1.0.0*/
Slf4j
public class Consumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(RocketConstant.ORIGINAL_CONSUMER_GROUP);// 设置NameServer的地址consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);// 订阅一个或多个Topic用Tag来过滤需要消费的消息这里指定*表示接收所有Tag的消息consumer.subscribe(RocketConstant.ORIGINAL_TOPIC, *);// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt messageExts, ConsumeConcurrentlyContext context) {log.info({}收到消息{}, this.getClass().getSimpleName(), messageExts);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();log.info(消费者启动成功!);}
}二、基于Springboot体系的消息发送写法
1.普通消息发送 具体有三种形式主要包括同步、异步、单向。 /*** 发送普通同步消息** return*/GetMapping(/sendMqBySync)ResponseBodypublic Object sendMqBySync() {RocketMessage message RocketMessage.builder().name(普通同步消息 LocalDateTime.now()).build();// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);return sendResult;}/*** 发送异步消息** return*/GetMapping(/sendMqByAsync)ResponseBodypublic Object sendMqByAsync() {RocketMessage message RocketMessage.builder().name(异步消息 LocalDateTime.now()).build();// asyncSendrocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {// 处理相应的业务log.info(发送成功:{}, JSON.toJSONString(sendResult));}Overridepublic void onException(Throwable throwable) {// 处理相应的业务log.info(发送异常:{}, throwable);}});return null;}/*** 发送单向消息* p* 这种方式主要用在不特别关心发送结果的场景例如日志发送** return*/GetMapping(/sendMqByOneWay)ResponseBodypublic Object sendMqByOneWay() {RocketMessage message RocketMessage.builder().name(单向消息 LocalDateTime.now()).build();// sendOneWayrocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);return null;}
2.顺序消息发送 具体有两种形式主要包括普通顺序、严格顺序。 /*** 发送普通顺序消息** return*/GetMapping(/sendMqByOrder)ResponseBodypublic Object sendMqByOrder() {ListSendResult results new ArrayList();for (int i 0; i 10; i) {RocketMessage message RocketMessage.builder().name(普通顺序消息 LocalDateTime.now() i).build();// syncSendOrderlySendResult sendResult rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, hashkey);results.add(sendResult);}return results;}/*** 发送严格顺序消息* p* 概念* 顺序消息是一种对消息发送和消费顺序有严格要求的消息* p* 生产顺序性* RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息并按序存储和持久化。如需保证消息生产的顺序性则必须满足以下条件* 单一生产者 消息生产的顺序性仅支持单一生产者不同生产者分布在不同的系统即使设置相同的分区键不同生产者之间产生的消息也无法判定其先后顺序。* 串行发送生产者客户端支持多线程安全访问但如果生产者使用多线程并行发送则不同线程间产生的消息将无法判定其先后顺序。** return*/GetMapping(/sendMqByStrictOrder)ResponseBodypublic Object sendMqByStrictOrder() {ListSendResult results new ArrayList();for (int i 0; i 10; i) {RocketMessage message RocketMessage.builder().name(严格顺序消息 LocalDateTime.now() i).build();// syncSendOrderlySendResult sendResult rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, hashkey);results.add(sendResult);}return results;}
3. 延迟消息发送 /*** 发送延时消息* p* 延时消息的使用限制* private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 现在RocketMq并不支持任意时间的延时需要设置几个固定的延时等级从1s到2h分别对应着等级1到18* 消息消费失败会进入延时消息队列** return*/GetMapping(/sendMqByDelay)ResponseBodypublic Object sendMqByDelay() {RocketMessage message RocketMessage.builder().name(延时消息 LocalDateTime.now()).build();// syncSend(... int delayLevel)SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);return sendResult;}4. 批量消息发送 /*** 批量发送消息* p* 在对吞吐率有一定要求的情况下RocketMQ可以将一些消息聚成一批以后进行发送* 可以增加吞吐率并减少API和网络调用次数** return*/GetMapping(/sendMqByBatch)ResponseBodypublic Object sendMqByBatch() {ListMessage messageList new ArrayList();for (int i 0; i 10; i) {RocketMessage message RocketMessage.builder().name(批量消息 LocalDateTime.now() i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);return sendResult;} 5. 事务消息发送 /*** 发送事务消息半消息* p* 仅仅只是保证本地事务和MQ消息发送形成整体的原子性而投递到MQ服务器后* 并无法保证消费者一定能消费成功* p** return*/GetMapping(/sendMqByTx)ResponseBodypublic Object sendMqByTx(Integer type, Integer msgKey) {String transactionId UUID.randomUUID().toString();No6LocalTransactionOriginalSyntaxListener transactionListener new No6LocalTransactionOriginalSyntaxListener();TransactionMQProducer producer new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);try {producer.setTransactionListener(transactionListener);producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);producer.start();log.info(transactionId is {}, transactionId);org.apache.rocketmq.common.message.Message msg new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,(事务消息 LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);msg.getProperties().put(type, String.valueOf(type));msg.getProperties().put(msgKey, String.valueOf(msgKey));SendResult sendResult producer.sendMessageInTransaction(msg, null);return sendResult;} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}return null;}
6. 带Tag消息发送 /*** 发送带Tag消息* p* Tag标签可以看作子主题它是消息的第二级类型* 通过 RocketMQTemplate发送带Tag的消息只需要将topic和tag中间通过【:】冒号连接即可** return*/GetMapping(/sendMqWithTag)ResponseBodypublic Object sendMqWithTag() {RocketMessage message RocketMessage.builder().name(tag消息 LocalDateTime.now()).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC : RocketConstant.TAG_EXPRESSION, message);return sendResult;}
三、基于Springboot体系的消息接收写法
1. MessageModel(消息模型) 消息模型有两种主要包括集群消费与广播消费 /*** 发送集群或广播消息* p* 广播消费模式下相同Consumer Group的每个Consumer实例都接收全量的消息。** param MessageModel 0:CLUSTERING 1:BROADCASTING* return*/GetMapping(/sendMqByMessageModel)ResponseBodypublic Object sendMqByMessageModel(RequestParam Integer MessageModel) {ListSendResult results new ArrayList();for (int i 0; i 10; i) {if (MessageModel 0) {RocketMessage message RocketMessage.builder().name(消息模型-集群消费 LocalDateTime.now() i).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message RocketMessage.builder().name(消息模型-广播消费 LocalDateTime.now() i).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);results.add(sendResult);}}return results;}
2. ConsumeMode(消费模型) 消费模型有两种主要包括并发消费与顺序消费 /*** 并发消费与顺序消费** param consumeMode 0:CONCURRENTLY 1:ORDERLY* return*/GetMapping(/sendMqByConsumeMode)ResponseBodypublic Object sendMqByConsumeMode(Integer consumeMode) {ListSendResult results new ArrayList();for (int i 0; i 10; i) {if (consumeMode 0) {RocketMessage message RocketMessage.builder().name(消费模型-并发消费 LocalDateTime.now() i).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);results.add(sendResult);} else {RocketMessage message RocketMessage.builder().name(消费模型-顺序消费 LocalDateTime.now() i).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);results.add(sendResult);}}return results;}
3. 消息过滤 消息过滤有两种方式主要包括Tag过滤与SQL92过滤 /*** Tag过滤与SQL92过滤* p* Tag过滤* 消费者订阅的Tag和发送者设置的消息Tag相互匹配则消息被投递给消费端进行消费。* SQL92过滤* 发送者设置Tag或自定义消息属性消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。* 开启对SQL语法的支持broker.conf* enablePropertyFilter true** param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性* return*/GetMapping(/sendMqByFilterMode)ResponseBodypublic Object sendMqByFilterMode(Integer filterMode) {if (filterMode 0) {RocketMessage message RocketMessage.builder().name(消费过滤-Tag过滤 LocalDateTime.now()).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC : RocketConstant.TAG_EXPRESSION, message);return sendResult;} else if (filterMode 1) {RocketMessage message RocketMessage.builder().name(消费过滤-SQL92过滤Tag LocalDateTime.now()).build();SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC : RocketConstant.SQL92_TAG_EXPRESSION, message);return sendResult;} else {// Message msg new Message(topic, tagA, Hello MQ.getBytes())// 设置自定义属性A属性值为1。- msg.putUserProperties(a, 1)// RocketMQTemplate 目前好像不支持这种写法RocketMessage message RocketMessage.builder().name(消费过滤-SQL92过滤自定义消息属性 LocalDateTime.now()).build();MapString, Object map new HashMap();map.put(a, 1);MessageHeaders messageHeaders new MessageHeaders(map);SendResult sendResult rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));return sendResult;}}
4. 消息重试与死信队列 /*** 消息重试与死信队列* p* 1. 消息重试只针对集群消费模式生效广播消费模式不提供失败重试特性即消费失败后* 失败消息不再重试继续消费新的消息。* 2. 一条消息初次消费失败后会自动进行消息重试达到最大重试次数后将其发送到该消费者对应的死信队列* 这类消息称为死信消息Dead-Letter Message。死信队列是死信Topic下分区数唯一的单独队列。* 3. 如果产生了死信消息那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName* 死信队列的消息将不会再被消费。* 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。** return*/GetMapping(/sendMqByRetry)ResponseBodypublic Object sendMqByRetry() {RocketMessage message RocketMessage.builder().name(消息重试 LocalDateTime.now()).build();// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);return sendResult;}
5. 消息应答 /*** 消息应答** return*/GetMapping(/sendByReply)ResponseBodypublic Object sendByReply() {RocketMessage message RocketMessage.builder().name(消息应答 LocalDateTime.now()).build();RocketMessage receiveMessage rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);return receiveMessage;}
6. Pull消费 Pull消费包括两种方式主要包括原始Pull Consumer与Lite Pull Consumer /*** 原始Pull Consumer的消息发送** return*/GetMapping(/sendByOriginalPull)ResponseBodypublic Object sendByOriginalPull() {ListMessage messageList new ArrayList();for (int i 0; i 100; i) {RocketMessage message RocketMessage.builder().name(原始pull消息 LocalDateTime.now() i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);return sendResult;}GetMapping(/pullByOriginal)ResponseBodypublic void pullByOriginal() {pullMgsOriginal.pull(0, 2);}/*** 使用rocketMQTemplate拉取消息** return*/GetMapping(/sendByTemplatePull)ResponseBodypublic Object sendByTemplatePull() {ListMessage messageList new ArrayList();for (int i 0; i 100; i) {RocketMessage message RocketMessage.builder().name(template pull消息 LocalDateTime.now() i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);return sendResult;}/*** LitePullSubscribe** return*/GetMapping(/sendBySubscribePull)ResponseBodypublic Object sendBySubscribePull() {ListMessage messageList new ArrayList();for (int i 0; i 100; i) {RocketMessage message RocketMessage.builder().name(subscribe pull消息 LocalDateTime.now() i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);return sendResult;}GetMapping(/pullBySubscribe)ResponseBodypublic void pullBySubscribe() {litePullSubscribeMsg.pull(20);}/*** LitePullAssign** return*/GetMapping(/sendByAssignPull)ResponseBodypublic Object sendByAssignPull() {ListMessage messageList new ArrayList();for (int i 0; i 100; i) {RocketMessage message RocketMessage.builder().name(assign pull消息 LocalDateTime.now() i).build();messageList.add(MessageBuilder.withPayload(message).build());}// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);return sendResult;}GetMapping(/pullByAssign)ResponseBodypublic void pullByAssign() {litePullAssignMsg.pull();}7. 设置消费点位 /*** 消费点类型设置成* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP** return*/GetMapping(/sendMqByConsumePoint)ResponseBodypublic Object sendMqByConsumePoint() {RocketMessage message RocketMessage.builder().name(设置消费点位 LocalDateTime.now()).build();// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);return sendResult;}
8. 消费者手动应答 /*** param mgs 0:会抛空指针重试三次。* return*/GetMapping(/sendMqByManualConfirm)ResponseBodypublic Object sendMqByManualConfirm(Integer mgs) {RocketMessage message RocketMessage.builder().name(mgs null ? 消费者手动应答 LocalDateTime.now() : String.valueOf(mgs)).build();// syncSendSendResult sendResult rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);return sendResult;}