云南省住房和城乡建设厅官方网站,工业企业在线平台,浙江网站建设和制作,做期货在哪个网站看消息目录 1#xff1a;生产者#xff08;同步、异步、单向#xff09;
1.1#xff1a;同步发送消息#xff08;每发送一条等待mq返回值#xff09;
1.2#xff1a;异步发送消息
1.3#xff1a;单向发送消息#xff08;不管成功失败#xff0c;只管发送消息#xff09…目录 1生产者同步、异步、单向
1.1同步发送消息每发送一条等待mq返回值
1.2异步发送消息
1.3单向发送消息不管成功失败只管发送消息
1.4顺序发送消息
1.5批量发送消息
1.6定时发送消息
2消费者
2.1push消费
2.2pull消费 1生产者同步、异步、单向 在了解生产者之前首先再次查看这个图片。生产者发送消息围绕生产者的概念和怎么发送消息来解析MQ。生产者有重要的的message元素 Message包含以下属性 字段名默认值必要性说明Topicnull必填消息所属 topic 的名称Bodynull必填消息体Tagsnull选填消息标签方便服务器过滤使用。目前只支持每个消息设置一个Keysnull选填代表这条消息的业务关键词唯一IDFlag0选填完全由应用来设置RocketMQ 不做干预DelayTimeLevel0选填消息延时级别0 表示不延时大于 0 会延时特定的时间才会被消费WaitStoreMsgOKtrue选填表示消息是否在服务器落盘后才返回应答。mq接受到消息存入磁盘然后返回成功或者失败 队列为了支持高并发和水平扩展需要对 Topic 进行分区在 RocketMQ 中这被称为队列一个 Topic 可能有多个队列并且可能分布在不同的 Broker 上。保证消息的发送和消费的并发速度。在生产者将消息发送到MQ的broker的时候这个时候broker的内部维护了队列保证先进先出。默认一个topic里边有4个读4个写的队列 我们怎么发送消息生产者发送消息包含同步异步单向这三个方面。当然按照功能又扩展出来顺序消息、批量消息、定时消息、事务消息等模式。 1.1同步发送消息每发送一条等待mq返回值
同步发送每次发送一条等待mq的返回值成功然后发送下一条适合可靠的消息传递适用范围最广泛。如重要的通知消息、短消息通知等 代码如下
public class 普通消息发送_同步 {/*** 同步消息发送发送之后等待服务端返回结果* 消息发送到broker 等待响应成功后接着发送下一条数据* 保证了消息发送的可靠性** 使用场景大部分可靠性要求高的场景*/public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//1初始化一个生产者并且设置group是商品组System.out.println(同步发送消息模式);String producerGroupshop_Group;DefaultMQProducer producernew DefaultMQProducer(producerGroup);//2设置nameServerproducer.setNamesrvAddr(localhost:9876);//3启动producerproducer.start();//4发送100条消息for (int i 0; i 10; i) {//5:定义消息体Message msgnew Message();//设置消息主题 必填String topichuyijuTopic;msg.setTopic(topic);//设置消息体 必填String body同步:i;msg.setBody(body.getBytes(StandardCharsets.UTF_8));//设置落盘策略 默认落盘成功返回true 选填msg.setWaitStoreMsgOK(true);//设置消息keys 消息唯一标识 选填msg.setKeys(shopi);//设置消息标签 选填msg.setTags(同步);//6发送数据SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}//7发送完消息关闭生产者producer.shutdown();}}1.2异步发送消息
异步发送消息发送完后不用等待响应就可以发送第二条消息通过回调接口来接受响应成功或者失败。异步发送一般用于链路耗时较长对响应时间较为敏感的业务场景。例如视频上传后通知启动转码服务转码完成后通知推送转码结果等。 代码如下
public class 普通消息发送_异步 {/*** 异步消息发送模式 发送数据之后不等响应接着发送需要回调接口回调接口告知失败 或者成功** 适用场景适用于发送文件视频等大的文件 节省时间**/public static void main(String[] args) throws Exception {System.out.println(异步发送普通消息);// 初始化一个producer并设置Producer group nameDefaultMQProducer producer new DefaultMQProducer(shop_Group);// 设置NameServer地址producer.setNamesrvAddr(localhost:9876);// 启动producerproducer.setRetryTimesWhenSendAsyncFailed(0);//重试次数producer.start();for (int i 0; i 5; i) {// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤String topichuyijuTopic;Message msg new Message(topic,异步,(异步i).getBytes(StandardCharsets.UTF_8));// 异步发送消息, 发送结果通过callback返回给客户端producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(数据发送成功sendResult);}Overridepublic void onException(Throwable e) {System.out.println(数据发送失败e);e.printStackTrace();}});}// 一旦producer不再使用关闭producerproducer.shutdown();}}1.3单向发送消息不管成功失败只管发送消息
单向发送生产者向mq发送消息不等待mq的返回值是否消息接收成功或者失败。生产者只管发送适用于日志等可靠性不高的场景。发送速度很快微秒级的速度 public class 普通消息发送_单向 {/*** 单向发送消息模式* 服务方只发送消息 不等服务端响应 也不管回调 发送速度很快 就是只管发送消息 不管成功失败** 适用场景适用于发送日志对于数据可靠性要求不高*/public static void main(String[] args) throws Exception {System.out.println(单向发送普通消息);// 初始化一个producer并设置Producer group nameDefaultMQProducer producer new DefaultMQProducer(shop_Group);// 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9876);// 启动producerproducer.start();for (int i 0; i 10; i) {final int index i;// 创建一条消息并指定topic、tag、body等信息tag可以理解成标签对消息进行再归类RocketMQ可以在消费端对tag进行过滤String body单向i;Message msg new Message(huyijuTopic,单向,body.getBytes(StandardCharsets.UTF_8));// 不管不顾直接发送没有返回值producer.sendOneway(msg);}// 一旦producer不再使用关闭producerproducer.shutdown();}}1.4顺序发送消息
我们知道我们发送的消息存储到了mq的topic的队列里边默认的topic是4个队列
根据消息的key将消息轮训的插入队列中队列的消息能保证FIFO但是我们并不知道实际具体那条消息在那个队列无法保证比如订单号是01的所有操作在同一个队列。如下图 消费者再消费的时候无法保证业务的一致性。所有才有了顺序发送我们传入指定的订单号只要订单号一直就一定会存到相同的队列。 代码如下
/*** 顺序发送SendResult send(Message msg, MessageQueueSelector selector, Object arg)* 根据同一个arg的值存入相同的队列 队列一共四个很多数据的时候根据arg%4 存入指定的队列先进先出** 适用场景下单支付物流等场景我们使用orderId作为分区id 会发送到同一个队列 保证顺序** 注意事项只能有一个生产者因为分布式环境多个生产者发送相同的同一个orderId,无法判定先后顺序* 必须是单一的生产者** 如果一个Broker掉线那么此时队列总数是否会发化** 如果发生变化那么同一个 ShardingKey 的消息就会发送到不同的队列上造成乱序。* 如果不发生变化那消息将会发送到掉线Broker的队列上必然是失败的。因此 Apache RocketMQ 提供了两种模式* 如果要保证严格顺序而不是可用性创建 Topic 是要指定 -o 参数--order为true表示顺序消息:** sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876*/
public class 顺序消息发送 {public static void main(String[] args) {System.out.println(顺序消息发送);DefaultMQProducer producer new DefaultMQProducer(shop_Group);try {// 设置NameServer地址producer.setNamesrvAddr(127.0.0.1:9876);int aproducer.getDefaultTopicQueueNums();System.out.println(默认的队列大小a);producer.start();for (int i 0; i 5; i) {
// String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};
// Message msg1
// new Message(TopicTest, tags[i % tags.length], KEY i,
// (Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));Message msgnew Message();//设置主题String topichuyijuTopic;msg.setTopic(topic);//设置内容String body顺序发送i;msg.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));//设置keysmsg.setKeys(keyi);//设置tagsmsg.setTags(顺序发送);//订单id 根据不同的id将消息发送到不同的队列(队列总共4个 取模放入队列) 遵循FIFO
// int orderId i%a;
// System.out.println(订单id:orderId);SendResult sendResult producer.send(msg, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object arg) {
// System.out.println(参数arg:arg);//这里参数的send方法的参数一致
// System.out.println(参数list:list.size());//默认队列是4
// System.out.println(message);//根据订单id取模存入指定的队列 然后返回该队列Integer id (Integer) arg;int index id % list.size();MessageQueue messageQueue list.get(index);return messageQueue;}}, 5);//这里的5就是实际上我们的订单号根据这个参数将消息存到相同的队列System.out.printf(%s%n, sendResult);}} catch (MQClientException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}finally {producer.shutdown();}}
}1.5批量发送消息
批量发送消息将消息批量发送到mq来节省时间
代码如下
/*** 批量投送消息增加吞吐率 减少网络调用次数** 需要注意的是批量消息的大小不能超过 1MB**/
public class 批量消息发送 {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {System.out.println(批量消息投送);DefaultMQProducer producer new DefaultMQProducer(shop_Group);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();String body1批量任务1;String body2批量任务2;String body3批量任务3;Message message1new Message(huyijuTopic,批量任务,body1.getBytes(StandardCharsets.UTF_8));Message message2new Message(huyijuTopic,批量任务,body2.getBytes(StandardCharsets.UTF_8));Message message3new Message(huyijuTopic,批量任务,body3.getBytes(StandardCharsets.UTF_8));ListMessage listnew ArrayList();list.add(message1);list.add(message2);list.add(message3);SendResult send producer.send(list);System.out.println(批量消息投送结束send);producer.shutdown();}
}1.6定时发送消息
定时消息发送将消息发送到mqmq根据定时将消息发送给消费者。切记不要搞反了消息是发送到mq之后定时发送个消费者。
代码如下
/*** 延时消息发送数据发送到mq之后 指定的时间之后才能消费** 适用场景定时任务、超时精准投送** 缺点大量的定时任务 容易造成消息积压 时间一到 消费者亚历山大**/
public class 延时消息发送 {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(shop_Group);// 设置NameServer地址
// 1 1s 10 6min
// 2 5s 11 7min
// 3 10s 12 8min
// 4 30s 13 9min
// 5 1min 14 10minproducer.setNamesrvAddr(127.0.0.1:9876);producer.start();for (int i 0; i 10; i) {String body 定时任务i;Message messagenew Message(huyijuTopic,定时任务,body.getBytes(StandardCharsets.UTF_8));message.setDelayTimeLevel(3);//设置的定时任务级别SendResult send producer.send(message);System.out.println(定时消息返回值send);}producer.shutdown();}
}2消费者
消息的消费者很简单只两种模式
第一种推送模式订阅mq服务的topicmq收到消息把消息推送给消费者适用范围广
第二种拉取模式订阅mq服务的topicmq收到消息消费者定时去mq拉取消息 2.1push消费
普通消息的推送消费 //适用于普通的消息推送不适合用于顺序消息的消费
public class 消息接收_推送1 {public static void main(String[] args) throws MQClientException {String group Shop_Group_push;//1:初始化消息接收组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group);//2:设置NameServer地址consumer.setNamesrvAddr(localhost:9876);//3:订阅一个或多个topic并指定tag过滤条件这里指定*表示接收所有tag的消息String topichuyijuTopic;consumer.subscribe(topic,*);consumer.setMessageModel(MessageModel.CLUSTERING);//默认是集群模式//consumer.setMessageModel(MessageModel.BROADCASTING);//这里是广播模式//4.1:注册回调接口来处理从Broker中收到的消息 单个对列保证先进先出//但是多个队列 被消费者并发消费 不能保证消费的顺序性consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(list);for (int i 0; i list.size(); i) {byte[] body list.get(i).getBody();String resault null;try {resault new String(body,utf-8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println(接受huyijuTopic的第i条消息resault);}// 返回消息消费状态ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();
// consumer.shutdown();System.out.println(推送消息接收启动1);}
}顺序消息的推送消费
public class 消息接收_顺序消费1 {public static void main(String[] args) throws MQClientException {System.out.println(顺序消费1);String group Shop_Group_push;//1:初始化消息接收组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(group);//2:设置NameServer地址consumer.setNamesrvAddr(localhost:9876);//3:订阅一个或多个topic并指定tag过滤条件这里指定*表示接收所有tag的消息String topichuyijuTopic;consumer.subscribe(topic,*);consumer.setMessageModel(MessageModel.CLUSTERING);//默认是集群模式//consumer.setMessageModel(MessageModel.BROADCASTING);//这里是广播模式//4.2:注册回调接口来处理从Broker中收到的消息 单个对列保证先进先出//但是多个队列 被消费者并发消费,不能保证消费的顺序性 这里使用MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {//AtomicLong consumeTimes new AtomicLong(0);consumeTimes.incrementAndGet();for (int i 0; i list.size(); i) {byte[] body list.get(i).getBody();String resault null;try {resault new String(body,utf-8);} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println(接受huyijuTopic的第i条消息resault);}//返回消费状态return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();
// consumer.shutdown();System.out.println(推送消息接收启动1);}
}2.2pull消费
消费者一直循环去mq拉取消息 public class 消息接收_拉取消息2 {public static void main(String[] args) throws MQClientException {System.out.println(开始消息拉取);DefaultLitePullConsumer defaultLitePullConsumer new DefaultLitePullConsumer();defaultLitePullConsumer.setConsumerGroup(Shop_Pull_Group);//订阅主题 拉取消息String topichuyijuTopic;defaultLitePullConsumer.subscribe(topic, *);defaultLitePullConsumer.setPullBatchSize(1);defaultLitePullConsumer.setNamesrvAddr(localhost:9876);defaultLitePullConsumer.start();try {while (true) {ListMessageExt messageExts defaultLitePullConsumer.poll();System.out.printf(%s%n, messageExts);System.out.println(拉取数据长度messageExts.size());for (int i 0; i messageExts.size(); i) {byte[] body messageExts.get(i).getBody();String resault new String(body,utf-8);System.out.println(拉取消息resault);}}} catch (UnsupportedEncodingException e) {e.printStackTrace();} finally {defaultLitePullConsumer.shutdown();}}
}以上就是生产者和消费者的消息模型。