成都 网站建设公司,濮阳公司做网站,网站建设与设计饰品,汉中建设工程招标网官网springcloud RocketMQ #xff0c;一个mq消息发送后#xff0c;客户端是怎么一步步拿到消息去消费的#xff1f;我们要从代码层面探究这个问题。 找的流程图#xff0c;有待考究。
以下我们开始debug#xff1a; 拉取数据的线程#xff1a; PullMessageService.java 本… springcloud RocketMQ 一个mq消息发送后客户端是怎么一步步拿到消息去消费的我们要从代码层面探究这个问题。 找的流程图有待考究。
以下我们开始debug 拉取数据的线程 PullMessageService.java 本质是一个线程类
public class PullMessageService extends ServiceThread {private final LinkedBlockingQueuePullRequest pullRequestQueue new LinkedBlockingQueuePullRequest();// ..
}执行逻辑一直循环拿取阻塞队列的内容阻塞队列的内容由负载均衡服务提供。阻塞队列中保存了目前客户端占有的 brokder - queue 信息 然后进入 DefaultMQPushConsumerImpl.java 的 pullMessage关键 这里面有个关键的方法this.pullAPIWrapper.pullKernelImpl(…) 这里传入了成功回调 pullCallback。 一直执行到 pullMessageAsync 是异步拉取消息成功后会执行回调。 成功后的回调逻辑
ConsumeMessageConcurrentlyService.java 的 submitConsumeRequest 方法将任务下发给消费者线程池 consumeExecutor ThreadPoolExecutor 类型去执行。日志显示就是这里执行的消费业务 ~~ ok我们看看开启的这个线程做了什么。 首先单独一个线程是无法debug跨线程的所以我们继续在 ConsumeMessageConcurrentlyService.ConsumeRequest 消费者请求线程中debug run方法看看是怎么执行到我们的业务逻辑的。 发现是 监听器 listener 的消费逻辑 这个 listener 是一个接口而且这个接口没有找到代码impl也就是可能是匿名的视线 我们debug直接跳到了 RocketMQInboundChannelAdapter.java 的监听器当时就是从这里把监听器注册进来的。 匿名方法执行了 RocketMQInboundChannelAdapter.this.consumeMessage 执行了一段 retry 逻辑spring的重试框架里面执行了发送消息逻辑。 发现底层用的是 spring 的 integration 消息通信框架 debug进去send逻辑会发送到一个 channel 中去 channel 里就有我们的处理方法的代理对象是转发 dispatcher 的目标处理器 handlers 之一。 后面不出所料就是通过反射去执行这个方法。 然后就跑到了我们的逻辑
创作不易希望点赞、收藏、关注支持~