seo网站优化培训价格,在线h5制作工具,碧辉腾乐 网站建设,郑州市主城区Kafka消费者主要负责消费#xff08;读取和处理#xff09;由生产者发布的消息。
1 消费者入门
消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。
消息发布到主题后#xff0c;只会被投递给订阅它的每…Kafka消费者主要负责消费读取和处理由生产者发布的消息。
1 消费者入门
消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。
消息发布到主题后只会被投递给订阅它的每个消费组中的一个消费者。 图 消费组与主题的关系
一个消费者增加新的消费者时分区会进行重分配每个消费者会消费不同的分区。可以通过增加或减少消费者的个数来提高或降低整体的消费能力。但如果消费者的个数大于分区个数会出现消费者分配不到任何分区的情况。
1.1 创建消费者消费的步骤
1配置消费者参数。必备参数key.deserializer、value.deserializer、bootstrap.servers、group.id消费者隶属的消费组名称
2创建消费者实例。
3订阅主题及分区。
4拉取消息并进行消费。
5关闭消费者实例。
public static void doConsume(String topic,ThreadSwitch threadSwitch) {new Thread(() - {Properties props new Properties();props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,HOST);ConsumerString,String consumer new KafkaConsumer(props);consumer.subscribe(Collections.singleton(topic));while (threadSwitch.running()) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString,String record : records) {System.out.println(record.partition() , record.offset() , record.value());}}consumer.close();}).start();}
1.1.1 订阅主题与分区
void subscribe(CollectionString topics);
void subscribe(CollectionString topics, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern);
void assign(CollectionTopicPartition partitions);
上面是消费者订阅主题及分区的方法。订阅方式有三种集合订阅AUTO_TOPICS、正则表达式订阅AUTO_PARTTERN、指定分区订阅USER_ASSIGNED。消费者只能选择其中一种方式。
1.1.2 查询指定主题的元数据信息
ListPartitionInfo partitionsFor(String topic);
ListPartitionInfo partitionsFor(String topic, Duration timeout); 图 PartitionInfo UML partitionsFor 是一个同步方法当给定主题没有元数据时会发送请求到Kafka服务器中请求元数据。
1.1.3 取消订阅
unsubscribe() 方法来取消主题的订阅也可以通过将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合的方式取消主题的订阅。
2 消息消费
ConsumerRecordsK, V poll(Duration timeout);
消费者通过不断轮询poll方法来获取所订阅的主题分区上的一组消息。
2.1 位移提交
对于分区它的每条消息都有唯一的offset用来表示消息在分区中对应的位置又称偏移量。
对应消费者使用offset来表示它消费到分区中某个消息所在的位置又称消费位移。
2.1.1 消费位移持久化
每次调用poll()方法时需要它返回的是还没有被消费过的消息集。为实现这个需要记录上次消费时的消费位移这个值不单单保存在内存中还需要做持久化保存否则消费者重启之后就无法知晓之前的消费位移。
在Kafka中消费位移存储在Kafka内部的主题__consumer_offsets中。把消费位移存储起来的动作称为“提交”。 图 消费位移
理想情况下我们希望消费到X的位置时能向服务器提交的是X1这个位置的消费位移。
2.1.2 重复消费与消息丢失
实际上通过poll方法获取消息后需要经过我们的业务代码处理获取的消息才能算真正的把消息消费完成。有时业务代码会比较耗时或出现异常。在消费消息时会出现重复消费或消息丢失的情况。 重复消费 消费完某条消息时因服务器故障导致没能提交消费位移再次启动时将会导致重复消费。 消息丢失 在还未消费完某组消息时已完成消费位移提交此时消费者发生故障导致消费中断。 消费者重启时会导致这些消息没能被继续消费。
表 重复消费及消息丢失的场景 2.1.3 自动提交
Kafka中默认的消费位移的提交方式是自动提交由消费者参数enable.auto.commit配置。周期性提交auto.commit.interval.ms配置自动提交的间隔时间默认值为5秒。
在未发生异常的时候poll方法在轮询的时候是通过保存在内存中的消费位移来取消息。
2.1.4 手动提交
自动位移提交无法做到精确的位移管理Kafka还提供来手动位移提交的方式。
void commitSync();
void commitSync(Duration timeout);
void commitSync(MapTopicPartition, OffsetAndMetadata offsets);
void commitSync(final MapTopicPartition, OffsetAndMetadata offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(MapTopicPartition, OffsetAndMetadata offsets, OffsetCommitCallback callback);
手动提交分为同步提交和异步提交。注意不要每消费完一条消息就提交一次这样会降低性能。
2.2 指定位移消费
auto.offset.reset 配置当消费者查找不到所记录的消费位移时从何处开始进行消费。默认值latest表示从分区末尾开始消费earliest表示从起始处none 表示既不从起始也不从末尾而是报出NoOffsetForPartitionException异常。
Kafka提供seek方法用于指定从特定的位移处开始拉取消息。
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(CollectionTopicPartition partitions);
void seekToEnd(CollectionTopicPartition partitions);
2.1.1 获取分区信息
调用seek方法时参数需要指定分区信息。上面的partitionsFor方法是获取某个主题下所有的分区信息而非该消费者实例所被分配的分区信息。
SetTopicPartition assignment();
assignment方法用于获取消费者所被分配的所有分区。如果分配还未发生或者正在重新分配过程中则返回值可能为空。可以通过以下方式来确保获取到的分区信息不为空。
SetTopicPartition partitionSet null;
while (partitionSet null || partitionSet.isEmpty()) {consumer.poll(Duration.ofMillis(100));partitionSet consumer.assignment();
}
2.3 再均衡
分区的所属权从一个消费者转移到另一个消费者的行为。当添加或删除消费组内的消费者时会发生再均衡。
再均衡发生期间消费组内的消费者是无法读取的。当一个分区被重新分配给另一个消费者时消费者当前的状态也会丢失。一般情况下应避免不必要的再均衡的发生。
subscribe方法有个参数是ConsumerRebalanceListener类型用于指定订阅后发生再均衡的监听器。 图 ConsumerRebalanceListener接口的UML
onPartitionsRevoked 会在再均衡开始之前和消费者停止读取消息之后被调用参数表示再均衡前所分配的分区。
onPartitionsAssigned 会再重新分配分区之后和消费者开始读取消费之前被调用。参数表示再均衡后所分配到的分区。
可以通过onPartitionsRevoked 回掉执行手动提交消费位移来尽量避免不必要的重复消费。
2.4 拦截器 图 ConsumerInterceptor 接口的UML
消费者客户端的interceptor.classes 的参数配置拦截器。
onConsume 会在poll方法返回之前调用来对消息进行相应的定制化操作。
onCommit 会在提交完消费位移之后被调用。
2.5 多线程实现
KafkaProducer是线程安全的而KafkaConsumer 不是。在KafkaConsumer 中除了wakeup()方法外的其他公用方法在执行前都会调用acquire()方法来检测当前是否只有一个线程在操作否则抛出异常。
2.5.1 多个消费者实例线程
创建多个线程每个线程创建一个KafkaConsumer实例每个消费者分别消费不同的区或者多个消费者同时消费同一个分区。
如果分区数量多那么所需要创建的线程也比较多每个消费线程都要维护一个独立的TCP连接这样会带来不小的系统开销。
2.5.2 一个消费线程 消费业务线程池
一般而言poll()拉取消息的速度是相当快而整体消费的瓶颈在处理消息这一块。
创建一个消费线程来管理一个KafkaConsumer实例及拉取消息。然后将这些消息分发给业务线程池中的线程去处理。
2.6 重要的消费者参数 fetch.min.bytes 在一次拉取请求中调用poll方法中能从Kafka中拉取的最小数据量。默认值为1(B)。如果返回给Consumer的数据量小于其值那它就需要等待直到满足。 适当调大这个参数能提高一定的吞吐量但也会造成额外的延迟。 fetch.max.wait.ms 指定Kafka的等待时间默认值500ms。 max.poll.records 在一次拉取请求中拉取的最大消息数默认值500 exclude.internal. topics Kafka有两个内部主题__consumer_offsets和__transaction_state。参数配置内部主题是否可以向消费者公开默认值true。 request.timeout.ms Consumer等待请求响应的最长时间默认值30000ms metadata.max.age. ms 元数据的过期时间默认值300000ms。 isolation.level 配置消费者的事物隔离级别表示消费者所消费到的位置。默认值read_uncommitted可以消费到HWHigh Watermark处的位置read_committed 会忽略事物未提交的消息只能消费到LSOLastStableOffset的位置。
表 重要的消费者参数