正规网站开发文案,庆阳建设局网站,个人养老金制度要来了,云南专业建网站消费者位移
消费者位移这一节介绍了消费者位移的基本概念和消息格式#xff0c;本节我们来聊聊消费位移的提交。
Consumer 需要向 Kafka 汇报自己的位移数据#xff0c;这个汇报过程被称为提交位移#xff08;Committing Offsets#xff09;。因为 Consumer 能够同时消费…消费者位移
消费者位移这一节介绍了消费者位移的基本概念和消息格式本节我们来聊聊消费位移的提交。
Consumer 需要向 Kafka 汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为 Consumer 能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了记录Consumer 的消费进度这样当 Consumer 发生重启之后就能够从 Kafka 中读取之前提交的位移从而继续消费避免以避免重复消费或消息丢失等。换句话说位移提交是 Kafka 提供给你的一个工具或语义保障你负责维持这个语义保障即如果你提交了位移 X那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。
因为位移提交非常灵活你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息你提交的位移值却是 20那么从理论上讲就丢失了10条数据相反地如果你提交的位移值是 5那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。
位移提交
从使用角度来说位移提交分为自动提交和手动提交从 Consumer 的角度来说位移提交分为同步提交和异步提交。
自动提交
默认情况下就是自动提交你根本无需关心位移提交的事情Consumer 端有个参数 enable.auto.commit默认值是 true即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms默认值是 5 秒即每 5 秒会为你自动提交一次位移。
这里我们用一段简单的代码来看看这两个参数怎么使用
Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, kafka_test);// 自动提交props.put(enable.auto.commit, true);// 间隔2秒 props.put(auto.commit.interval.ms, 2000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {// process}}
手动提交
设置 enable.auto.commit 为 false还需要调用相应的 API 手动提交位移KafkaConsumer.commitSync()。
// props.put(enable.auto.commit, false);
while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
commitSync()有一个缺陷提交时Consumer 程序会处于阻塞状态在生产系统中因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔但这样做的后果是 Consumer 的提交频率下降在下次 Consumer 重启回来后会有更多的消息被重新消费。鉴于这个问题Kafka 提供了另一个 异步API 方法KafkaConsumer.commitAsync()。
不过commitAsync 的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此异步提交的重试其实没有意义所以 commitAsync 是不会重试的。
我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题 try {while(true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}
同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交我们调用 commitAsync() 避免程序阻塞而在 Consumer 要关闭前我们调用 commitSync() 方法执行同步阻塞式的位移提交以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后我们既实现了异步无阻塞式的位移管理也确保了 Consumer 位移的正确性如果你自行编写代码开发一套 Kafka Consumer 应用可以尝试使用上面的代码范例来实现手动的位移提交。
其实还有一种更高级的提交方式就是分批量提交就不再这里展开留给大家查资料学习也欢迎各位同学在评论区交流讨论