四川省查询建设人员注册证书网站,网站建设的发展,网上做网站接活怎么样,备案的时候网站名称在使用rocketmq过程中总能看见一下异常
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控#xff0c;Consu…在使用rocketmq过程中总能看见一下异常
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控Consumer流控
1.Broker流控
Rocketmq默认采取的是异步刷盘方式Producer把消息发送到broker后Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中 1.1 broker busy
Broker是开启快速失败的处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求每 10 ms 执行一次代码如下
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS);
} 1.1.1 Page Cache繁忙
清理过期请求之前会先判断一下Page Cache是否繁忙如果繁忙就会给Producer返回一个系统繁忙的状态码code2remark[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d那怎么判断Page Cache繁忙呢
当Broker收到消息后会放到Page Cache中这个过程首先会取一个CommitLog写入锁如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。
1.1.2清理过期请求
清理过期请求时如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码code2remark[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d
1.2 system busy
这个异常在 NettyRemotingAbstract#processRequestCommand 方法.
1.拒绝请求
如果 NettyRequestProcessor 拒绝了请求就会给 Producer 返回一个系统繁忙的状态码code2remark[REJECTREQUEST]system busy, start flow control for a while。
那什么情况下请求会被拒绝呢看下面这段代码
//SendMessageProcessor类
public boolean rejectRequest() {return this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
从代码中可以看到请求被拒绝的情况有两种可能一个是 Page Cache 繁忙另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法发现判断依据是在开启 transientStorePoolEnable 配置的情况下是否还有可用的 ByteBuffer。
注意在开启 transientStorePoolEnable 的情况下写入消息时会先写入堆外内存DirectByteBuffer然后刷入 Page Cache最后刷入磁盘。而读取消息是从 Page Cache这样可以实现读写分离避免读写都在 Page Cache 带来的问题 2.线程拒绝
Broker 收到请求后会把处理逻辑封装成到 Runnable 中由线程池来提交执行如果线程池满了就会拒绝请求这里线程池中队列的大小默认是 10000可以通过参数 sendThreadPoolQueueCapacity 进行配置线程池拒绝后会抛出异常 RejectedExecutionException程序捕获到异常后会判断是不是单向请求OnewayRPC如果不是就会给 Producer 返回一个系统繁忙的状态码code2remark[OVERLOAD]system busy, start flow control for a while
判断 OnewayRPC 的代码如下flag 2 或者 3 时是单向请求
public boolean isOnewayRPC() {int bits 1 RPC_ONEWAY;return (this.flag bits) bits;
} 1.3消息重试
Broker 发生流量控制的情况下返回给 Producer 系统繁忙的状态码code2Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码
//DefaultMQProducer类
private final SetInteger retryResponseCodes new CopyOnWriteArraySetInteger(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_ERROR,ResponseCode.NO_PERMISSION,ResponseCode.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT
));
2.Consumer流控 DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。
2.1 缓存消息数量超过阈值
ProcessQueue 保存的消息数量超过阈值默认 1000可以配置源码如下
if (cachedMessageCount this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes % 1000) 0) {log.warn(the cached message count exceeds the threshold {}, so do flow control, minOffset{}, maxOffset{}, count{}, size{} MiB, pullRequest{}, flowControlTimes{},this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.2缓存消息大小超过阈值
ProcessQueue 保存的消息大小超过阈值默认 100M可以配置源码如下
if (cachedMessageSizeInMiB this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes % 1000) 0) {log.warn(the cached message size exceeds the threshold {} MiB, so do flow control, minOffset{}, maxOffset{}, count{}, size{} MiB, pullRequest{}, flowControlTimes{},this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.3缓存消息跨度超过阈值
对于非顺序消费的场景ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值默认 2000可以配置。源代码如下
if (!this.consumeOrderly) {if (processQueue.getMaxSpan() this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes % 1000) 0) {log.warn(the queues messages, span too long, so do flow control, minOffset{}, maxOffset{}, maxSpan{}, pullRequest{}, flowControlTimes{},processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}
}
2.4获取锁失败
对于顺序消费的情况ProcessQueue加锁失败也会延迟拉取这个延迟时间默认是 3s可以配置。
3.总结
本文介绍了 RocketMQ 发生流量控制的 8 个场景其中 Broker 4 个场景Consumer 4 个场景。Broker 的流量控制本质是对 Producer 的流量控制最好的解决方法就是给 Broker 扩容增加 Broker 写入能力。而对于 Consumer 端的流量控制需要解决 Consumer 端消费慢的问题比如有第三方接口响应慢或者有慢 SQL。
Broker4种场景
page cache 繁忙获取commitlog写入锁超过1s
清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms
请求拒绝 一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer
线程池拒绝 Broker 收到请求后会把处理逻辑封装成到 Runnable 中由线程池来提交执行如果线程池满了就会拒绝请求这里线程池中队列的大小默认是 10000
Consumer4种场景
消息数量超过阈值1000
消息大小超过阈值100m
缓存消息跨度超过阈值 对于非顺序消费的场景ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000
ProcessQueue加锁失败 也会延迟加载