当前位置: 首页 > news >正文

筹建网站信息技术做家装家居网站

筹建网站信息技术,做家装家居网站,河北省城乡和建设厅网站首页,西安东郊网站建设动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器#xff08;Copy SpringBoot 源码#xff09;4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考#xff1a;Kafka搭建和测试 … 动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器Copy SpringBoot 源码4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考Kafka搭建和测试 二.动态初始化消费者 1.Topic定义 动态初始化即不通过注解和配置文件实现消费者的初始化定义一个Topic对象用于设置消费者参数 package com.demo.entity;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;/*** author * date 2023-02-08 15:06* since 1.8*/ Data AllArgsConstructor NoArgsConstructor public class Topic {private String id;private String topic;private Integer partitions;private String group test;private String clientPrefix; }2.方法处理器工厂 此类直接使用 SpringBoot 源码原实现为私有类 package com.demo.manual;import org.springframework.context.ApplicationContext; import org.springframework.core.convert.TypeDescriptor; import org.springframework.core.convert.converter.ConditionalGenericConverter; import org.springframework.core.convert.converter.Converter; import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.lang.Nullable; import org.springframework.messaging.converter.GenericMessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.util.Assert; import org.springframework.validation.Validator;import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.*;/*** author * date 2023-02-08 14:18* since 1.8*/ public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private ListHandlerMethodArgumentResolver customMethodArgumentResolvers new ArrayList();private final DefaultFormattingConversionService defaultFormattingConversionService new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator validator;this.applicationContext applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory null) {this.handlerMethodFactory createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory new DefaultMessageHandlerMethodFactory();if (this.validator ! null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);ListHandlerMethodArgumentResolver customArgumentsResolver new ArrayList(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converterbyte[], String {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset charset;}Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}OverrideNullablepublic SetConvertiblePair getConvertibleTypes() {HashSetConvertiblePair pairs new HashSet();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}OverrideNullablepublic Object convert(Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length 8, At least 8 bytes needed to convert a byte[] to a long); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length 4, At least 4 bytes needed to convert a byte[] to an integer); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length 2, At least 2 bytes needed to convert a byte[] to a short); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length 1, At least 1 byte needed to convert a byte[] to a byte); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class? target targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}} } 3.参数解析器Copy SpringBoot 源码 此类直接使用 SpringBoot 源码原实现为私有类 package com.demo.manual;import org.springframework.core.MethodParameter; import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver; import org.springframework.validation.Validator;import java.util.List;/*** author * date 2023-02-08 14:36* since 1.8*/ public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}Overridepublic Object resolveArgument(MethodParameter parameter, Message? message) throws Exception { // NOSONARObject resolved super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List? list ((List?) resolved);for (int i 0; i list.size(); i) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}Overrideprotected boolean isEmptyPayload(Object payload) {return payload null || payload instanceof KafkaNull;}} 4.消费接口和消费实现 当前接口和实现为了用于做统一的数据处理可以在实现类内再根据Topic去调用对应的数据解析方法 接口 package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 13:46* since 1.8*/ public interface Handler {void deal(ConsumerRecordString, String cRecord); } 实现 package com.demo.manual;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord;/*** author * date 2023-02-08 11:49* since 1.8*/ Slf4j public class ManualHandler implements Handler{Overridepublic void deal(ConsumerRecordString, String cRecord) {log.info( Topic:{} Partition:{} Content:{},cRecord.topic(),cRecord.partition(),cRecord.value());} } 5.动态初始化 1.关键类简介 此处通过接口调用实现创建、暂停和恢复消费可根据实际应用场景进行设计 关键类说明KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器可以通过 Id 获取 Listener 实例从而暂停或恢复消费监听ConcurrentKafkaListenerContainerFactoryListener 工厂定义代码可参考上面链接的2.3 节ConsumerAwareListenerErrorHandler消费异常处理器定义代码可参考上面链接的2.3 节ApplicationContextSpring 的上下文容器MessageHandlerMethodFactory 初始化用MethodKafkaListenerEndpointKafka 配置节点详细逻辑可参考源码 SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法 package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor /*** 此处为相关源码仅供参考 寻找带有 KafkaListener 注解的类并初始化/Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class? targetClass AopUtils.getTargetClass(bean);CollectionKafkaListener classLevelListeners findListenerAnnotations(targetClass);final boolean hasClassLevelListeners !classLevelListeners.isEmpty();final ListMethod multiMethods new ArrayList();MapMethod, SetKafkaListener annotatedMethods MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookupSetKafkaListener) method - {SetKafkaListener listenerMethods findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {SetMethod methodsWithHandler MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method -AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() - No KafkaListener annotations found on bean type: bean.getClass());}else {// Non-empty set of methodsfor (Map.EntryMethod, SetKafkaListener entry : annotatedMethods.entrySet()) {Method method entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() - annotatedMethods.size() KafkaListener methods processed on bean beanName : annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}2.动态初始化实现 package com.demo.controller;import com.demo.entity.Topic; import com.demo.manual.MessageHandlerMethodFactory; import com.demo.manual.ManualHandler; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.util.ReflectionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/*** author * date 2023-02-07 13:40* since 1.8*/ Slf4j RestController RequestMapping(/listener) public class ListenerController {AutowiredKafkaListenerEndpointRegistry registry;AutowiredQualifier(batchTestContainerFactory)ConcurrentKafkaListenerContainerFactoryString,String batchTestContainerFactory;AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;PostConstructprivate void init(){factory new MessageHandlerMethodFactory(null,applicationContext);}static MapString, Topic map new ConcurrentHashMap();static {map.put(test_manual_1_id,new Topic(test_manual_1_id,test-topic-new.1,2,mygroup,test_manual_1_batch));map.put(test_manual_2_id,new Topic(test_manual_2_id,test-topic-new.2,1,mygroup,test_manual_2_batch));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例容器为 Map 实现* MapString, MessageListenerContainer* param id*/GetMapping(/close)public void close(String id){MessageListenerContainer container registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停暂停则恢复* 如果不存在则定义一个消费者注册到容器内并启动* param id* throws NoSuchMethodException*/GetMapping(/open)public void open(String id) throws NoSuchMethodException {MessageListenerContainer container registry.getListenerContainer(id);if (null!container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic map.get(id);if (nulltopic){return;}ManualHandler bean new ManualHandler();MethodKafkaListenerEndpointString, String endpoint new MethodKafkaListenerEndpoint();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* param methodArg* param bean* return*/private Method checkProxy(Method methodArg, Object bean) {Method method methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a KafkaListener method on the target class for this JDK proxy -// is it also present on the proxy itself?method bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class?[] proxiedInterfaces ((Advised) bean).getProxiedInterfaces();for (Class? iface : proxiedInterfaces) {try {method iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (SuppressWarnings(unused) NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format(KafkaListener method %s found on bean target class %s, but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to true, method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;} }
http://www.w-s-a.com/news/112381/

相关文章:

  • 网站建设600元包公司设计图片大全
  • 网站建设费用怎么做分录做校园网站代码
  • 网站改版做重定向福州网站建设思企
  • 网站建设全流程企业形象网站开发业务范畴
  • wordpress无法查看站点西安优秀高端网站建设服务商
  • 固始网站制作熟悉免费的网络营销方式
  • 做网站到a5卖站赚钱搜索引擎优化代理
  • 沈阳网站建设包括win10优化
  • 做百度手机网站点击软网站seo优化徐州百度网络
  • 徐州专业网站制作标志设计作业
  • 自己可以做网站空间吗海天建设集团有限公司网站
  • 教学督导网站建设报告aspcms网站图片不显示
  • 网站开发公司成本是什么门户网站宣传方案
  • 上海 企业网站建设网站怎么开通微信支付
  • 饮料网站建设wordpress主题猫
  • 网站建设需要编码不有没有专门的网站做品牌授权的
  • 做爰在线网站免费空间列表
  • 网站外链建设工作总结郑州网站建设扌汉狮网络
  • 建设企业网站的需要多长时间网站使用说明书模板
  • 建网站首页图片哪里找263企业邮箱网页版登录
  • 盐城网站建设电话高端定制网站
  • 成都网站seo技术施工企业样板先行制度
  • 高端网站建设电话河北建筑工程信息网站
  • 亲 怎么给一个网站做备份财务系统有哪些软件
  • wordpress重新手机优化专家下载
  • 怎样把网站做成软件设计工作室怎么接单
  • html网站设计实例代码重庆多个区划定风险区
  • 推广方案设计同一个网站可以同时做竞价和优化
  • 论坛网站开发 go电商扶贫网站建设
  • 个人建站教程优秀的定制网站建设