做视频解析网站,wordpress mdtf,做网站公众号多少钱,浙江做网站找谁Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构#xff08;log-based#xff09;的消息引擎
消费消息时#xff0c;只是从磁盘文件上读取数据#xff0c;不会删除消息数据位移数据能由消费者控制#xff0c;能很容易修改位移的值#xff0c;实现重复消费历史数据…
Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构log-based的消息引擎
消费消息时只是从磁盘文件上读取数据不会删除消息数据位移数据能由消费者控制能很容易修改位移的值实现重复消费历史数据
重设位移的两个维度 :
位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值时间维度 : 将位移调整成 该时间的最小位移
7 种重设策略 :
维度策略含义位移维度Earliest位移调整当前最早位移处Latest位移调整到当前最新位移处Current位移调整到当前最新提交位移处Specified-Offset位移调整成指定位移Shift-By-N位移调整到当前位移 N 处时间维度DataTime位移调整到大于给定时间的最小位移处Duration位移调整到离当前时间指定间隔的位移处
Earliest 策略 : 将位移调整到主题当前最早位移处
不一定是 0很久的消息会被 Kafka 自动删除所以当前最早位移可能是 0 的值当要重新消费主题的所有消息就用 Earliest 策略
Latest 策略 : 把位移重设成最新末端位移
当向某个主题发送 15 条消息最新末端位移是 15当要跳过所有历史消息从最新的消息处开始消费就用 Latest 策略
Current 策略 : 将位移调整成消费者当前提交的最新位移
修改消费者代码并重启消费者结果发现代码有问题回滚前的代码变更还把位移重设为消费者重启时的位置
Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处
消费者处理某条错误消息时 , 能手动跳过此消息的处理生产中 , 出现 corrupted 消息无法被消费时消费者会抛出异常无法继续工作
Specified-Offset 策略 : 指定位移的绝对数值
Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离
把位移重设成当前位移的前 100 条位移处指定 N 为 -100
DateTime : 指定一个时间将位移重置到该时间之后的最早位移处
使用场景 : 重新消费昨天的数据并使用该策略重设位移到昨天 0 点
Duration 策略 : 相对的时间间隔将位移调整到距离当前给定时间间隔的位移处格式是 PnDTnHnMnS
Java 8 引入 Duration : 以字母 P 开头后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)将位移调回到 15 分钟前就指定 PT0H15M0S
消费者 API
Earliest 策略 :
Properties consumerProperties new Properties();
// 禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 重设的消费者组的组 ID
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic test; // 要重设位移的 Kafka 主题
try (final KafkaConsumerString, String consumer new KafkaConsumer(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo -new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
} Latest 策略 :
consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo - new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));Current 策略 : 获取当前提交的最新位移 :
consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).forEach(tp - {long committedOffset consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});Specified-Offset 策略 :
long targetOffset 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);
}实现 Shift-By-N 策略
for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset consumer.committed(tp).offset() 123L; consumer.seek(tp, targetOffset);
}DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点
long ts LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - ts));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}Duration 策略 : 将位移调回 30 分钟前
MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - System.currentTimeMillis() - 30 * 1000 * 60));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}命令行
Earliest 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-earliest –executeLatest 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-latest --executeCurrent 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-current --executeSpecified-Offset 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-offset offset --executeShift-By-N 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--shift-by offset_N --executeDateTime 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--to-datetime 2019-06-20T20:00:00.000 --executeDuration 策略 :
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--by-duration PT0H30M0S --execute