当前位置: 首页 > news >正文

做词频分析的网站免费主页空间申请

做词频分析的网站,免费主页空间申请,东莞网络公司现状,虚拟网站建设步骤1、开启kafka的注解EnableKafka 通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration#xff0c;而此类中有两个重要的类: KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry 2、Kaf…1、开启kafka的注解EnableKafka 通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration而此类中有两个重要的类: KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry 2、KafkaListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法 前置知识需要了解BeanPostProcessor的运行时机 Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {//判断当前bean是否是没有注解的类if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class? targetClass AopUtils.getTargetClass(bean);//获取类上面的注解(KafkaListener、KafkaListeners)CollectionKafkaListener classLevelListeners findListenerAnnotations(targetClass);//是否存在类级别注解(习惯在方法上后面都是以方法级别介绍)final boolean hasClassLevelListeners classLevelListeners.size() 0;final ListMethod multiMethods new ArrayList();//收集类中所有方法上带有KafkaListener、KafkaListeners注解的方法(方法级别、方法级别、方法级别)MapMethod, SetKafkaListener annotatedMethods MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookupSetKafkaListener) method - {SetKafkaListener listenerMethods findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});//是否存在类级别注解if (hasClassLevelListeners) {SetMethod methodsWithHandler MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method -AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {//方法级别没有注解 把类添加了nonAnnotatedClasses集合this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() - No KafkaListener annotations found on bean type: bean.getClass());}else {// Non-empty set of methodsfor (Map.EntryMethod, SetKafkaListener entry : annotatedMethods.entrySet()) {Method method entry.getKey();for (KafkaListener listener : entry.getValue()) {//进入processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() - annotatedMethods.size() KafkaListener methods processed on bean beanName : annotatedMethods);}//类级别注解if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse checkProxy(method, bean);//注解封装的endpointMethodKafkaListenerEndpointK, V endpoint new MethodKafkaListenerEndpoint();//方法endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint?, ? endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}//beanendpoint.setBean(bean);//KafkaHandlerMethodFactoryAdapter消息处理方法工厂endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);//KafkaListener注解设置的id没有配置就是org.springframework.kafka.KafkaListenerEndpointContainer#原子自增的数值endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), clientIdPrefix));String group kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, concurrency));}String autoStartup kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, autoStartup));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());//获取消费者监听工厂KafkaListenerContainerFactory? factory null;String containerFactoryBeanName resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory ! null, BeanFactory must be set to obtain container factory by bean name);try {factory this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException(Could not register Kafka listener endpoint on [ adminTarget ] for bean beanName , no KafkaListenerContainerFactory.class.getSimpleName() with id containerFactoryBeanName was found in the application context, ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName resolveExpressionAsString(kafkaListener.errorHandler(), errorHandler);if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}//重点看这里 KafkaListenerEndpointRegistrarthis.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}KafkaListenerEndpointRegistrar#registerEndpoint public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory? factory) {Assert.notNull(endpoint, Endpoint must be set);Assert.hasText(endpoint.getId(), Endpoint id must be set);// Factory may be null, we defer the resolution right before actually creating the container//endpoint和消费者工厂合并到一个类中KafkaListenerEndpointDescriptor descriptor new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { //false 是否立刻启动this.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {//主要走这this.endpointDescriptors.add(descriptor);}}}3、KafkaListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法 前置知识需要了解SmartInitializingSingleton类的afterSingletonsInstantiated方法的运行时机 DefaultListableBeanFactory#preInstantiateSingletons方法最下面 public void preInstantiateSingletons() throws BeansException {//....省部分代码// Trigger post-initialization callback for all applicable beans...for (String beanName : beanNames) {Object singletonInstance getSingleton(beanName);if (singletonInstance instanceof SmartInitializingSingleton) {SmartInitializingSingleton smartSingleton (SmartInitializingSingleton) singletonInstance;if (System.getSecurityManager() ! null) {AccessController.doPrivileged((PrivilegedActionObject) () - {smartSingleton.afterSingletonsInstantiated();return null;}, getAccessControlContext());}else {//这里这里这里smartSingleton.afterSingletonsInstantiated();}}}}KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated public void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {//true 这段代码进入MapString, KafkaListenerConfigurer instances ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {//XXConfigurer的扩展修改registrar里面相关内容configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() null) {//true 这段代码进入if (this.endpointRegistry null) {//第一点提到的KafkaListenerEndpointRegistry类this.endpointRegistry this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}//默认工厂beanNamekafkaListenerContainerFactoryif (this.defaultContainerFactoryBeanName ! null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurer//消息处理方法工厂MessageHandlerMethodFactory handlerMethodFactory this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory ! null) {this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listeners//重点: 注册监听this.registrar.afterPropertiesSet();}KafkaListenerEndpointRegistrar#afterPropertiesSet Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {//遍历前面收集的endpointDescriptors集合for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint this.validator ! null) {((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);}//注册监听工厂this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately true; // trigger immediate startup}}KafkaListenerEndpointRegistry#registerListenerContainer public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory? factory) {//传入是否启动为falseregisterListenerContainer(endpoint, factory, false);}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory? factory,boolean startImmediately) {String id endpoint.getId();synchronized (this.listenerContainers) {//创建监听容器MessageListenerContainer container createListenerContainer(endpoint, factory);//放入集合this.listenerContainers.put(id, container);//有分组则注册bean(beanNamegroup, objectListMessageListenerContainer)if (StringUtils.hasText(endpoint.getGroup()) this.applicationContext ! null) {ListMessageListenerContainer containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup new ArrayListMessageListenerContainer();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}//falseif (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory? factory) {//创建监听容器MessageListenerContainer listenerContainer factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {//falsetry {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException(Failed to initialize message listener container, ex);}}int containerPhase listenerContainer.getPhase();if (listenerContainer.isAutoStartup() containerPhase ! AbstractMessageListenerContainer.DEFAULT_PHASE) { // a custom phase valueif (this.phase ! AbstractMessageListenerContainer.DEFAULT_PHASE this.phase ! containerPhase) {throw new IllegalStateException(Encountered phase mismatch between container factory definitions: this.phase vs containerPhase);}this.phase listenerContainer.getPhase();}return listenerContainer;}AbstractKafkaListenerContainerFactory#createListenerContainer public C createListenerContainer(KafkaListenerEndpoint endpoint) {//创建从前实例 进入看看C instance createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpointK, V) endpoint);}//属性复制endpoint.setupListenerContainer(instance, this.messageConverter);//调用子类的initializeContainerinitializeContainer(instance, endpoint);//扩展customizeContainer(instance);return instance;}//子类: ConcurrentKafkaListenerContainerFactory#initializeContainerOverrideprotected void initializeContainer(ConcurrentMessageListenerContainerK, V instance,KafkaListenerEndpoint endpoint) {super.initializeContainer(instance, endpoint);//一个topic启动几个消费者(注意这里是几个消费者很多项目配置很大再加上几个节点就设置了很多无用的消费者取while(true)消耗cpu)if (endpoint.getConcurrency() ! null) {instance.setConcurrency(endpoint.getConcurrency());}else if (this.concurrency ! null) {instance.setConcurrency(this.concurrency);}}ConcurrentKafkaListenerContainerFactory#createContainerInstance protected ConcurrentMessageListenerContainerK, V createContainerInstance(KafkaListenerEndpoint endpoint) {//KafkaListener注解上配置内容//指定分区消费TopicPartitionOffset[] topicPartitions endpoint.getTopicPartitionsToAssign();if (topicPartitions ! null topicPartitions.length 0) {ContainerProperties properties new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);}else {CollectionString topics endpoint.getTopics();if (!topics.isEmpty()) {//指定topic消费 ContainerProperties 继承ConsumerPropertiesContainerProperties properties new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);}else {ContainerProperties properties new ContainerProperties(endpoint.getTopicPattern());return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);}}}4、KafkaListenerEndpointRegistry类的start方法 此类实现了SmartLifecycle接口关注start方法; 前置知识需要了解KafkaListenerEndpointRegistry类的start方法的运行时机AbstractApplicationContext#finishRefresh protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();//这里这里这里// Propagate refresh to lifecycle processor first.//看这LifecycleProcessor本身就是LifeCycle接口的扩展 DefaultLifecycleProcessor 的 onRefreshgetLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}public void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running true;}private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {//listenerContainer.isAutoStartup true//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#startlistenerContainer.start();}}//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#start public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,() - A GenericMessageListener.class.getName() implementation must be provided);//子类ConcurrentMessageListenerContainer#doStartdoStart();}}}ConcurrentMessageListenerContainer#doStart protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties getContainerProperties();TopicPartitionOffset[] topicPartitions containerProperties.getTopicPartitions();if (topicPartitions ! null this.concurrency topicPartitions.length) {this.logger.warn(() - When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from this.concurrency to topicPartitions.length);this.concurrency topicPartitions.length;}setRunning(true);for (int i 0; i this.concurrency; i) {//一个topic启动多个消费者//构造KafkaMessageListenerContainerKafkaMessageListenerContainerK, V container constructContainer(containerProperties, topicPartitions, i);String beanName getBeanName();container.setBeanName((beanName ! null ? beanName : consumer) - i);container.setApplicationContext(getApplicationContext());if (getApplicationEventPublisher() ! null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(this.concurrency 1 || this.alwaysClientIdSuffix ? - i : );container.setGenericErrorHandler(getGenericErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setEmergencyStop(() - {stop(() - {// NOSONAR});publishContainerStoppedEvent();});if (isPaused()) {container.pause();}//启动 AbstractMessageListenerContainer#startcontainer.start();this.containers.add(container);}}}KafkaMessageListenerContainer#doStart类 protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties getContainerProperties();checkAckMode(containerProperties);Object messageListener containerProperties.getMessageListener();//创建线程池if (containerProperties.getConsumerTaskExecutor() null) {SimpleAsyncTaskExecutor consumerExecutor new SimpleAsyncTaskExecutor((getBeanName() null ? : getBeanName()) -C-);containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener? listener (GenericMessageListener?) messageListener;ListenerType listenerType determineListenerType(listener);//ListenerConsumer对象是一个runnable实现类this.listenerConsumer new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch new CountDownLatch(1);//线程池执行任务this.listenerConsumerFuture containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error(Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?);publishConsumerFailedToStart();}}catch (SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}}KafkaMessageListenerContainer.ListenerConsumer#ListenerConsumer ListenerConsumer(GenericMessageListener? listener, ListenerType listenerType) {Properties consumerProperties propertiesFromProperties();checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);this.autoCommit determineAutoCommit(consumerProperties);//创建消费者this.consumer KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId,this.containerProperties.getClientId(),KafkaMessageListenerContainer.this.clientIdSuffix,consumerProperties);this.clientId determineClientId();this.transactionTemplate determineTransactionTemplate();this.genericListener listener;this.consumerSeekAwareListener checkConsumerSeekAware(listener);this.commitCurrentOnAssignment determineCommitCurrent(consumerProperties,KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());subscribeOrAssignTopics(this.consumer);GenericErrorHandler? errHandler KafkaMessageListenerContainer.this.getGenericErrorHandler();if (listener instanceof BatchMessageListener) {this.listener null;this.batchListener (BatchMessageListenerK, V) listener;this.isBatchListener true;this.wantsFullRecords this.batchListener.wantsPollResult();}else if (listener instanceof MessageListener) {this.listener (MessageListenerK, V) listener;this.batchListener null;this.isBatchListener false;this.wantsFullRecords false;}else {throw new IllegalArgumentException(Listener must be one of MessageListener, BatchMessageListener, or the variants that are consumer aware and/or Acknowledging not listener.getClass().getName());}this.listenerType listenerType;this.isConsumerAwareListener listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)|| listenerType.equals(ListenerType.CONSUMER_AWARE);if (this.isBatchListener) {validateErrorHandler(true);this.errorHandler new LoggingErrorHandler();this.batchErrorHandler determineBatchErrorHandler(errHandler);}else {validateErrorHandler(false);this.errorHandler determineErrorHandler(errHandler);this.batchErrorHandler new BatchLoggingErrorHandler();}Assert.state(!this.isBatchListener || !this.isRecordAck,Cannot use AckMode.RECORD with a batch listener);if (this.containerProperties.getScheduler() ! null) {this.taskScheduler this.containerProperties.getScheduler();this.taskSchedulerExplicitlySet true;}else {ThreadPoolTaskScheduler threadPoolTaskScheduler new ThreadPoolTaskScheduler();threadPoolTaskScheduler.initialize();this.taskScheduler threadPoolTaskScheduler;}this.monitorTask this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,Duration.ofSeconds(this.containerProperties.getMonitorInterval()));if (this.containerProperties.isLogContainerConfig()) {this.logger.info(this.toString());}MapString, Object props KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();this.checkNullKeyForExceptions checkDeserializer(findDeserializerClass(props, consumerProperties, false));this.checkNullValueForExceptions checkDeserializer(findDeserializerClass(props, consumerProperties, true));this.syncCommitTimeout determineSyncCommitTimeout();if (this.containerProperties.getSyncCommitTimeout() null) {// update the property so we can use it directly from code elsewherethis.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);if (KafkaMessageListenerContainer.this.thisOrParentContainer ! null) {KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);}}this.maxPollInterval obtainMaxPollInterval(consumerProperties);this.micrometerHolder obtainMicrometerHolder();this.deliveryAttemptAware setupDeliveryAttemptAware();this.subBatchPerPartition setupSubBatchPerPartition();}Overridepublic void run() { // NOSONAR complexityListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count 0;this.last System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable null;while (isRunning()) {try {//拉取消息pollAndInvoke();}catch (SuppressWarnings(UNUSED) WakeupException e) {// Ignore, were stopping or applying immediate foreign acks}catch (NoOffsetForPartitionException nofpe) {this.fatalError true;ListenerConsumer.this.logger.error(nofpe, No offset and no reset policy);exitThrowable nofpe;break;}catch (AuthorizationException ae) {if (this.authorizationExceptionRetryInterval null) {ListenerConsumer.this.logger.error(ae, Authorization Exception and no authorizationExceptionRetryInterval set);this.fatalError true;exitThrowable ae;break;}else {ListenerConsumer.this.logger.error(ae, Authorization Exception, retrying in this.authorizationExceptionRetryInterval.toMillis() ms);// We cant pause/resume here, as KafkaConsumer doesnt take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authorizationExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError true;ListenerConsumer.this.logger.error(fie, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG has been fenced);exitThrowable fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, Stopping container due to fencing);stop(false);exitThrowable e;}catch (Error e) { // NOSONAR - rethrownRunnable runnable KafkaMessageListenerContainer.this.emergencyStop;if (runnable ! null) {runnable.run();}this.logger.error(e, Stopping container due to an Error);wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}}wrapUp(exitThrowable);}protected void pollAndInvoke() {if (!this.autoCommit !this.isRecordAck) {processCommits();}fixTxOffsetsIfNeeded();idleBetweenPollIfNecessary();if (this.seeks.size() 0) {processSeeks();}pauseConsumerIfNecessary();this.lastPoll System.currentTimeMillis();if (!isRunning()) {return;}this.polling.set(true);//拉取消息ConsumerRecordsK, V records doPoll();if (!this.polling.compareAndSet(true, false) records ! null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() 0) {this.logger.debug(() - Discarding polled records, container stopped: records.count());}return;}resumeConsumerIfNeccessary();debugRecords(records);if (records ! null records.count() 0) {savePositionsIfNeeded(records);notIdle();invokeListener(records);}else {checkIdle();}}private ConsumerRecordsK, V doPoll() {ConsumerRecordsK, V records;if (this.isBatchListener this.subBatchPerPartition) {if (this.batchIterator null) {this.lastBatch this.consumer.poll(this.pollTimeout);if (this.lastBatch.count() 0) {return this.lastBatch;}else {this.batchIterator this.lastBatch.partitions().iterator();}}TopicPartition next this.batchIterator.next();ListConsumerRecordK, V subBatch this.lastBatch.records(next);records new ConsumerRecords(Collections.singletonMap(next, subBatch));if (!this.batchIterator.hasNext()) {this.batchIterator null;}}else {//拉取消息 基本apirecords this.consumer.poll(this.pollTimeout);checkRebalanceCommits();}return records;}
http://www.w-s-a.com/news/300580/

相关文章:

  • 在线网站开发网站在哪里
  • 建站的步骤上海快速优化排名
  • 招聘网站做一下要多少钱网站设计公司 国际
  • 巩义专业网站建设公司首选seo研究院
  • 大流量网站解决访问量友情链接如何添加
  • 教育网站建设网永康市住房和城乡建设局网站
  • 阿里巴巴官网网站django 做网站的代码
  • 网站建设 军报wordpress 订餐模板
  • 网站虚拟主机 会计处理石家庄站建设费用多少
  • 网站建设 服务内容 费用简述网站开发流程
  • 公司制作网站跟企业文化的关系空间制作网站
  • 浙江建设监理协会网站个人网站设计规划书
  • wordpress太卡了贵州seo推广
  • 企业介绍微网站怎么做的手机软件商城免费下载
  • 新手网站设计定价网站开发销售
  • 网站开发公司oa有没有找人做标书的网站
  • 传统门户网站有哪些人武部正规化建设
  • 台州网站制作方案免费无代码开发平台
  • 精通网站建设 pdf微盘学做电商的步骤
  • 想在网上做设计接单有没有网站找一个免费域名的网站
  • 湘潭市网站建设科技有限公司杭州网站建设(推荐乐云践新)
  • 优秀网站评析西双版纳傣族自治州民宿
  • 常用的cms建站系统c2c网站模板
  • wordpress更换图标seo网站建设公司
  • 网站备案 深圳小程序怎么进入公众号
  • 实名认证域名可以做电影网站吗坪山网站设计的公司
  • wdcp怎么上传做好的网站管理咨询公司名称参考
  • 设计师网站pin分销系统小程序开发
  • 高端品牌网站建设兴田德润实惠企业网站建设应该怎么做
  • 做研学的网站优秀软文案例