营销网站建设工作,江川区住房和城乡建设局网站,seo网站优化方法,海外人才招聘网站消息监听容器
1、KafkaMessageListenerContainer
由spring提供用于监听以及拉取消息#xff0c;并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理#xff0c;相当于一个消费者#xff1b;
看看其整体代码结构#xff1a; 可以发现其入口方法为doStart(),…消息监听容器
1、KafkaMessageListenerContainer
由spring提供用于监听以及拉取消息并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理相当于一个消费者
看看其整体代码结构 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口很明显由spring管理其start和stop操作
ListenerConsumer, 内部真正拉取消息消费的是这个结构其 实现了Runable接口简言之它就是一个后台线程轮训拉取并处理消息while true死循环拉取消息。
在doStart方法中会创建ListenerConsumer并交给线程池处理
以上步骤就开启了消息监听过程。
KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() 0, ackCount must be 0);}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) containerProperties.getAckTime() 0) {containerProperties.setAckTime(5000);}}Object messageListener containerProperties.getMessageListener();Assert.state(messageListener ! null, A MessageListener is required);if (containerProperties.getConsumerTaskExecutor() null) {SimpleAsyncTaskExecutor consumerExecutor new SimpleAsyncTaskExecutor((getBeanName() null ? : getBeanName()) -C-);containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, Listener must be a GenericListener);this.listener (GenericMessageListener?) messageListener;ListenerType listenerType ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating this.listener;while (delegating instanceof DelegatingMessageListener) {delegating ((DelegatingMessageListener?) delegating).getDelegate();}listenerType ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager ! null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count 0;this.last System.currentTimeMillis();if (isRunning() this.definedPartitions ! null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error(Failed to set initial offsets, e);}}long lastReceive System.currentTimeMillis();long lastAlertAt lastReceive;while (isRunning()) {try {if (!this.autoCommit !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused true;if (this.logger.isDebugEnabled()) {this.logger.debug(Paused consumption from: this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecordsK, V records this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll System.currentTimeMillis();if (this.consumerPaused !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug(Resuming consumption from: this.consumer.paused());}SetTopicPartition paused this.consumer.paused();this.consumer.resume(paused);this.consumerPaused false;publishConsumerResumedEvent(paused);}if (records ! null this.logger.isDebugEnabled()) {this.logger.debug(Received: records.count() records);if (records.count() 0 this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p - records.records(p).stream())// map to same format as send metadata toString().map(r - r.topic() - r.partition() r.offset()).collect(Collectors.toList()));}}if (records ! null records.count() 0) {if (this.containerProperties.getIdleEventInterval() ! null) {lastReceive System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() ! null) {long now System.currentTimeMillis();if (now lastReceive this.containerProperties.getIdleEventInterval() now lastAlertAt this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, were stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError true;ListenerConsumer.this.logger.error(No offset and no reset policy, nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error(No offset and no reset policy; stopping container);KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info(Consumer stopped);}
2、ConcurrentMessageListenerContainer
并发消息监听相当于创建消费者其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理从实现上看就是在KafkaMessageListenerContainer上做了层包装有多少的concurrency就创建多个KafkaMessageListenerContainer也就是concurrency个消费者。 protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties getContainerProperties();TopicPartitionInitialOffset[] topicPartitions containerProperties.getTopicPartitions();if (topicPartitions ! null this.concurrency topicPartitions.length) {this.logger.warn(When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from this.concurrency to topicPartitions.length);this.concurrency topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i 0; i this.concurrency; i) {KafkaMessageListenerContainerK, V container;if (topicPartitions null) {container new KafkaMessageListenerContainer(this, this.consumerFactory,containerProperties);}else {container new KafkaMessageListenerContainer(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName getBeanName();container.setBeanName((beanName ! null ? beanName : consumer) - i);if (getApplicationEventPublisher() ! null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(- i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}
3、KafkaListener底层监听原理
上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息但还缺少关键的一步即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来
那么这个桥梁就是KafkaListener注解
KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作当然这里也是此类会扫描带KafkaListener注解的类或者方法通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer并调用start方法启动监听也就是这样打通了这条路…
4、Spring Boot 自动加载kafka相关配置
1、KafkaAutoConfiguration 自动生成kafka相关配置比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等默认创建bean实例
2、KafkaAnnotationDrivenConfiguration 主要是针对于spring-kafka提供的注解背后的相关操作比如 KafkaListener;
在开启了EnableKafka注解后spring会扫描到此配置并创建缺少的bean实例比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候就会默认创建一个beanName为kafkaListenerContainerFactory的实例这也是为什么在springboot中不用定义consumer的相关配置也可以通过KafkaListener正常的处理消息
5、消息处理
1、单条消息处理
Configuration
public class KafkaConsumerConfiguration {BeanKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}private MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长单位毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}
这种方式的KafkaLisener中的参数是单条的。
2、批量处理
Configuration
EnableKafka
public class KafkaConfig {Bean
public KafkaListenerContainerFactory?, ? batchFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true); // return factory;
}Beanpublic ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意这里接受的是集合类型
KafkaListener(id list, topics myTopic, containerFactory batchFactory)
public void listen(ListString list) {...
}
这种方式的KafkaLisener中的参数是多条的。
6、线程池相关
如果没有额外给Kafka指定线程池底层默认用的是SimpleAsyncTaskExecutor类它不使用线程池而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。
总结
spring为了将kafka融入其生态方便在spring大环境下使用kafka开发了spring-kafa这一模块本质上是为了帮助开发者更好的以spring的方式使用kafka
KafkaListener就是这么一个工具在同一个项目中既可以有单条的消息处理也可以配置多条的消息处理稍微改变下配置即可实现很是方便
当然KafkaListener单条或者多条消息处理仍然是spring自行封装处理与kafka-client客户端的拉取机制无关比如一次性拉取50条消息对于单条处理来说就是循环50次处理而多条消息处理则可以一次性处理50条本质上来说这套逻辑都是spring处理的并不是说单条消费就是通过kafka-client一次只拉取一条消息
在使用过程中需要注意spring自动的创建的一些bean实例当然也可以覆盖其自动创建的实例以满足特定的需求场景。 原文链接https://blog.csdn.net/yuechuzhixing/article/details/124725713