广州那里有学做拼多多网站的,公司名字大全20000个免费,做的网站太大怎么办,网络举报网站目录
前言
准备
消息发送方式
深入源码
消息发送模式
选择发送方式
同步发送消息
校验消息体
获取Topic订阅信息
高级特性-消息重投
选择消息队列-负载均衡
装载消息体发送消息
压缩消息内容
构造发送message的请求的Header
更新broker故障信息
异步发送消息
…目录
前言
准备
消息发送方式
深入源码
消息发送模式
选择发送方式
同步发送消息
校验消息体
获取Topic订阅信息
高级特性-消息重投
选择消息队列-负载均衡
装载消息体发送消息
压缩消息内容
构造发送message的请求的Header
更新broker故障信息
异步发送消息
总结 前言
上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习
准备
如果没看前一篇的这里还是要强调本篇的rocketmq版本 首先我们从github上拉取rocketmqd的源码链接到本地使用idea打开。 源码地址https://github.com/apache/rocketmq 目前最新版本为5.2.0 那么我们在idea上切换分支为 release-5.2.0 注请保持和本篇的版本一直方便后面文章中给出的代码块定位 消息发送方式
在读源码之前我们先了解下mq支持的发送消息的类型。
消息的发送方式有三种但我们最常用的是同步的方式发送
sync 同步消息发送后必须等待消息的发送结果返回后才能发送下一条消息async 异步消息发送后不用等待返回结果直接发送下一条数据但会设置一个回调方法接收返回结果oneway 单向消息发送后不会返回结果也不会等待也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景
深入源码
首先进入外层的producer.send()方法中
//源码位置
//包名org.apache.rocketmq.example.simple
//文件名Producer
//行数42
SendResult sendResult producer.send(msg);
消息发送模式
//源码位置
//包名org.apache.rocketmq.client.producer
//文件名DefaultMQProducer
//行数431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量发送if (this.getAutoBatch() !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//单条发送return sendDirect(msg, null, null);}
}
自动批处理发送 -sendByAccumulator 该方法用于将消息累积到一个批处理容器中等待足够的消息数量或达到某个时间间隔后再进行批量发送。可以显著减少发送次数提高吞吐量。 2. 直接发送 -sendDirect 适用于即时发送或消息已经是批处理消息的情况 本章的重点就是直接发送消息这也是开发中使用最频发的方式
选择发送方式
//源码位置
//包名org.apache.rocketmq.client.producer
//文件名DefaultMQProducer
//行数720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback null) {if (mq null) {//同步不指定队列return this.defaultMQProducerImpl.send(msg);} else {//同步指定队列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq null) {//异步不指定队列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//异步指定队列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}
有上面代码可以知道方法中提供了三个参数设置
msg 消息体这个为必填项sendCallback 消息回调对象如果这个参数不为空则为异步发送为空则为同步发送mq 指定的队列指定与不指定的区别在于后续是否需要对队列负载均衡下面源码中会讲到
根据最开始生产者发送消息我们只传入了msg,所以本次重点看同步不指定队列代码实现
同步发送消息
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
跟踪代码我们可以看到方法中我们默认设置了CommunicationMode.SYNC 同步发送模式并且回调参数为空以及设置了默认超时时间3s
校验消息体
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数704
Validators.checkMessage(msg, this.defaultMQProducer);
该方法就是校验消息内容是否合规
校验消息内容是否不为空消息大小是否超过最大值maxMessageSize 1024 * 1024 * 4; // 4M校验消息发送的topic是否为不为空以及topic的长度是否超过默认最长值127
获取Topic订阅信息
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数709
TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());
该方法通过消息体中的topic名称获取topic的订阅信息该方法在我们上一篇生产者启动中已经出现过了深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据没有则从远程nameserver中拉取
高级特性-消息重投
这是rocketMq中一个重要的特性消息如果投递失败了会重新投递
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数715
int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
这段代码就是获取总过重投的次数
不难看出只有发送方式为同步发送时才为1 this.defaultMQProducer.getRetryTimesWhenSendFailed() 3次其余发送方式都只有一次机会。
只有同步发送消息才支持消息重投如果第一次投递失败了mq还回重试2次投递
找到上面源码位置往下看其实可以看到下面代码就是使用了一个for循环来进行重投
选择消息队列-负载均衡
通过上面我们知道最开始并没有指定队列所以需要程序来获取一个队列。
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数724
MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
因为自动创建的topic会被默认分配4个队列生产环境为手动创建topic以及设置队列数量所以我们必须使用负载均衡保证队列的合理分配到不同队列上减轻单个队列的压力
topicPublishInfo为消息发送到指定topic的订阅信息lastBrokerName 为上一次选择的broker名称如果在集群模式下topic也会存在于多个broker上因此记录上一次选择的broker名称可以避免连续选择同一个 Broker从而实现更好的负载均衡和容错处理 resetIndex 重置队列索引位置根据源码逻辑可知当消息进行重新投递时会重置topic订阅消息中队列的索引位置
深入上面源码会发现队列负载均衡的算法获取索引策略默认就是轮询
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名TopicPublishInfo
//行数101
int index Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); 负载均衡策略 轮询策略 (Round-Robin)随机策略 (Random)一致性哈希策略 (Consistent Hashing)权重随机策略 (Weighted Random)最少连接策略 (Least Connections) 装载消息体发送消息
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数740
sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
该方法就是发现消息的核心方法了不管是同步发送还是异步发送都会执行该方法
做一些发送消息前的准备接下深入该方法查看
压缩消息内容
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数898
if (this.tryToCompressMessage(msg)) {sysFlag | MessageSysFlag.COMPRESSED_FLAG;sysFlag | compressType.getCompressionFlag();msgBodyCompressed true;
}
首先判断消息是否大于4k( compressMsgBodyOverHowmuch 1024 * 4),大于则进行压缩小于则不处理
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数1070
byte[] data compressor.compress(body, compressLevel);
传入消息体以及压缩的等级这里大佬们提供了三种压缩实现分别基于三种不同的压缩框架 在我们日常工作中如果需要压缩内容也可以参考大佬们的实现学习源码不仅仅是了解框架的本身也要吸取优秀的地方合理运用 构造发送message的请求的Header
message是Producer发送给Broker的一个请求我们可以把内容抽象成两部分组成请求头、请求体
请求体就是消息本身数据请求头 SendMessageRequestHeader 则包含了各种必要的数据比如topic、messaeQueue等等更多可直接查看请求头对象源码
最后就是使用基于netty实现的远程调用发送消息到broker中
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数1016
sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);
更新broker故障信息
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);
程序执行到这个位置说明前面消息发送的流程全部执行完成了那么我们也知道了消息发送的结果从而知道broker服务的状态情况我们需要把当前的broker故障情况更新到 faultItemTable 本地map中供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。
异步发送消息
从选择发送方式代码中当sendCallbacknull时则进入异步发送消息
跟踪源码我们可知异步发送其实就是创建了一个单独的线程使用Runnable对象实现因为会返回一个执行结果
//源码位置
//包名org.apache.rocketmq.client.impl.producer
//文件名DefaultMQProducerImpl
//行数550
Runnable runnable new Runnable() {Overridepublic void run() {long costTime System.currentTimeMillis() - beginStartTime;if (timeout costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException(DEFAULT ASYNC send call timeout));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
sendDefaultImpl() 该方法就是和同步发送调用的同一个了唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBackexecuteAsyncMessageSend() 执行异步消息发送
总结
本篇对生产者发送消息源码进行了跟踪学习你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读Broker负责接收和存储消息管理消息队列并将消息分发给消费者, 是担任连接生产者和消费者确保消息的高效传输和存储保证系统的可靠性和性能的重要角色。