湖南响应式网站设计,企业网上的推广,株洲外贸网站建设,wordpress 博客 地址深入解析Kafka消息传递的可靠性保证机制
Kafka在设计上提供了不同层次的消息传递保证#xff0c;包括at most once#xff08;至多一次#xff09;、at least once#xff08;至少一次#xff09;和exactly once#xff08;精确一次#xff09;。每种保证通过不同的机制…深入解析Kafka消息传递的可靠性保证机制
Kafka在设计上提供了不同层次的消息传递保证包括at most once至多一次、at least once至少一次和exactly once精确一次。每种保证通过不同的机制实现下面详细介绍Kafka如何实现这些消息传递保证。
1. At Most Once至多一次
在这种模式下消息可能会丢失但不会被重复传递。这通常发生在消费者在处理消息之前提交了偏移量导致即使消息处理失败也认为已经处理完成。
实现机制
消费者配置enable.auto.committrue并且默认提交偏移量的时间间隔较短auto.commit.interval.ms。消费者在处理消息之前提交偏移量处理过程中如果发生故障消息不会被重新处理。
2. At Least Once至少一次
在这种模式下消息不会丢失但可能会被重复传递。消费者确保在处理消息后才提交偏移量故障恢复后会重新处理未提交偏移量的消息。
实现机制
消费者配置enable.auto.commitfalse手动提交偏移量。消费者在处理完每条消息后调用consumer.commitSync()或consumer.commitAsync()提交偏移量。
示例代码
Properties props new Properties();
props.put(bootstrap.servers, your_kafka_broker:9092);
props.put(group.id, test_group);
props.put(enable.auto.commit, false);
props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);
props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(your_topic));try {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {// 处理消息}consumer.commitSync(); // 确保消息处理后提交偏移量}
} finally {consumer.close();
}3. Exactly Once精确一次
在这种模式下消息既不会丢失也不会重复传递。Kafka通过引入幂等性生产者和事务性API来实现这种保证。
实现机制
幂等性生产者确保生产者在重试发送时不会产生重复消息。通过配置enable.idempotencetrue启用幂等性。事务性生产者和消费者确保在生产和消费过程中可以使用事务使消息的生产和消费操作要么全部成功要么全部失败。
配置幂等性生产者
Properties props new Properties();
props.put(bootstrap.servers, your_kafka_broker:9092);
props.put(acks, all);
props.put(retries, Integer.MAX_VALUE);
props.put(enable.idempotence, true);
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);
props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(props);使用事务性生产者和消费者
// 配置事务性生产者
Properties props new Properties();
props.put(bootstrap.servers, your_kafka_broker:9092);
props.put(acks, all);
props.put(retries, Integer.MAX_VALUE);
props.put(enable.idempotence, true);
props.put(transactional.id, my-transactional-id); // 唯一的事务ID
props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);
props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(props);
producer.initTransactions(); // 初始化事务try {producer.beginTransaction(); // 开始事务producer.send(new ProducerRecord(your_topic, key, value));// 其他的发送操作producer.commitTransaction(); // 提交事务
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {producer.abortTransaction(); // 中止事务producer.close();
}在消费者端可以使用Kafka Streams API或者事务性消费模式确保精确一次语义。
总结
Kafka提供了不同层次的消息传递保证通过合适的配置和使用模式用户可以根据应用需求选择合适的保证模式
At Most Once适用于对数据丢失不敏感的应用。At Least Once适用于不能接受数据丢失但可以接受重复数据的应用。Exactly Once适用于对数据一致性要求非常高的应用。
通过合理配置生产者、消费者和broker可以在不同场景下实现合适的消息传递保证。