知名网站开发哪里有,门户网站建设的特点,大连网站建设哪家好,能制作视频的软件引入
reset offset#xff0c;即重置消费进度#xff0c;一般在以下场景中使用#xff1a;
需要重新消费已经消费过的消息#xff0c;重置到最早位置或根据时间进行重置。消息积压#xff0c;不需要消费积压的消息#xff0c;重置到最新位置#xff0c;使其从最新位置…引入
reset offset即重置消费进度一般在以下场景中使用
需要重新消费已经消费过的消息重置到最早位置或根据时间进行重置。消息积压不需要消费积压的消息重置到最新位置使其从最新位置开始消费。 重置到最早、或者根据时间进行重置与消息补发的区别 ● 消息补发是将原先的消息由生产者重发一次与区别的那边消息本质上不是同一条消息除了消息体一样以外。 ● 重置操作是操作消费位点offset本质上还是消费生产者之前发送的那条消息。 源码解析 重置offset起始调用位置
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent区别 org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent ● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。 org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp ● 用来根据给定的时间戳来重置消费者的偏移量。 这两个入口本质上都是resetOffset没有本质上的区别我们以resetOffsetNewConcurrent为例具体流程如下图
首先是examineTopicRouteInfo主要是获取topic的路由信息如果路由信息不存在则无法进行后续操作。再者是InvokeBrokerToResetOffset根据上一步拿到的路由信息遍历路由一次向broker发起调用。请求到达服务端Broker端判断是否是Broker端侧处理 ○ Broker端处理 前置检查look-ahead check判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在不满足任意条件直接返回。将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中assignResetOffset将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。最后prepare reset result并返回response。 Client端处理 先执行queryOffset查询当前topic下的group下offsetTable中是是否存储了offset信息有就返回对应的值没有返回-1前置检查look-ahead check检查上一步返回结果consumerOffset是否为-1为-1表示当前group不存在检查timeStampOffset是否满足条件满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中如果是C的客户端直接将timestampOffset放入offsetTable中。请求到达客户端后先将对应的consumer挂起suspend清除ProcessQueue中的消息在sleep 10s。再执行updateConsumeOffset更新consumerOffset。最后再resume继续消费。 补充 如果是服务端重置重置之后的offset会写入resetOffsetTable中在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset如果queryThenEraseResetOffset中有返回值将resetOffset作为GetMessageResult的nextBeginOffset拉取操作用的offset。 public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key topic TOPIC_GROUP_SEPARATOR group;ConcurrentMapInteger, Long map resetOffsetTable.get(key);if (null map) {return null;} else {return map.remove(queueId);}
}参考 ● https://rocketmq.apache.org/ ● https://github.com/apache/rocketmq