做视频发哪个网站赚钱,公共体育课程网站建设,怎么创立网站 优帮云,团购网站app制作优质博文#xff1a;IT-BLOG-CN
【需求】#xff1a;生产者发送数据至 kafka 序列化使用 Avro#xff0c;消费者通过 Avro 进行反序列化#xff0c;并将数据通过 MyBatisPlus 存入数据库。
一、环境介绍
【1】Apache Avro 1.8#xff1b;【2】Spring Kafka 1.2#xf…优质博文IT-BLOG-CN
【需求】生产者发送数据至 kafka 序列化使用 Avro消费者通过 Avro 进行反序列化并将数据通过 MyBatisPlus 存入数据库。
一、环境介绍
【1】Apache Avro 1.8【2】Spring Kafka 1.2【3】Spring Boot 1.5【4】Maven 3.5
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.codenotfound/groupIdartifactIdspring-kafka-avro/artifactIdversion0.0.1-SNAPSHOT/versionnamespring-kafka-avro/namedescriptionSpring Kafka - Apache Avro Serializer Deserializer Example/descriptionurlhttps://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html/urlparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion1.5.4.RELEASE/version/parentpropertiesjava.version1.8/java.versionspring-kafka.version1.2.2.RELEASE/spring-kafka.versionavro.version1.8.2/avro.version/propertiesdependencies!-- spring-boot --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency!-- spring-kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion${spring-kafka.version}/version/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdversion${spring-kafka.version}/versionscopetest/scope/dependency!-- avro --dependencygroupIdorg.apache.avro/groupIdartifactIdavro/artifactIdversion${avro.version}/version/dependency/dependenciesbuildplugins!-- spring-boot-maven-plugin --plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin!-- avro-maven-plugin --plugingroupIdorg.apache.avro/groupIdartifactIdavro-maven-plugin/artifactIdversion${avro.version}/versionexecutionsexecutionphasegenerate-sources/phasegoalsgoalschema/goal/goalsconfigurationsourceDirectory${project.basedir}/src/main/resources/avro//sourceDirectoryoutputDirectory${project.build.directory}/generated/avro/outputDirectory/configuration/execution/executions/plugin/plugins/build
/project二、Avro 文件
【1】Avro 依赖于由使用JSON定义的原始类型组成的架构。对于此示例我们将使用Apache Avro入门指南中的“用户”模式如下所示。该模式存储在src / main / resources / avro下的 user.avsc文件中。我这里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 类时指定的 package 路径name 表时生成的文件。
{namespace: com.yd.cyber.protocol.avro,type: record,name: ElectronicsPackage,fields: [{name:package_number,type:[string,null],default: null},{name:frs_site_code,type:[string,null],default: null},{name:frs_site_code_type,type:[string,null],default:null},{name:end_allocate_code,type:[string,null],default: null},{name:code_1,type:[string,null],default: null},{name:aggregat_package_code,type:[string,null],default: null}]
}【2】Avro附带了代码生成功能该代码生成功能使我们可以根据上面定义的“用户”模式自动创建Java类。一旦生成了相关的类就无需直接在程序中使用架构。这些类可以使用 avro-tools.jar 或项目是Maven 项目调用 Maven Projects 进行 compile 自动生成 electronicsPackage.java 文件如下是通过 maven 的方式 【3】这将导致生成一个 electronicsPackage.java 类该类包含架构和许多 Builder构造 electronicsPackage对象的方法。 三、为 Kafka 主题生成 Avro消息
Kafka Byte 在其主题中存储和传输数组。但是当我们使用 Avro对象时我们需要在这些 Byte数组之间进行转换。在0.9.0.0版之前Kafka Java API使用 Encoder/ Decoder接口的实现来处理转换但是在新API中这些已经被 Serializer/ Deserializer接口实现代替。Kafka附带了许多 内置反序列化器但不包括Avro。为了解决这个问题我们将创建一个 AvroSerializer类该类Serializer专门为 Avro对象实现接口。然后我们实现将 serialize() 主题名称和数据对象作为输入的方法在本例中该对象是扩展的 Avro对象 SpecificRecordBase。该方法将Avro对象序列化为字节数组并返回结果。这个类属于通用类一次配置多次使用。
package com.yd.cyber.web.avro;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;/*** avro序列化类* author zzx* creat 2020-03-11-19:17*/
public class AvroSerializerT extends SpecificRecordBase implements SerializerT {Overridepublic void close() {}Overridepublic void configure(MapString, ? arg0, boolean arg1) {}Overridepublic byte[] serialize(String topic, T data) {if(data null) {return null;}DatumWriterT writer new SpecificDatumWriter(data.getSchema());ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream();BinaryEncoder binaryEncoder EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null);try {writer.write(data, binaryEncoder);binaryEncoder.flush();byteArrayOutputStream.close();}catch (IOException e) {throw new SerializationException(e.getMessage());}return byteArrayOutputStream.toByteArray();}
}四、AvroConfig 配置类
Avro 配置信息在 AvroConfig 配置类中现在我们需要更改AvroConfig 开始使用我们的自定义 Serializer实现。这是通过将“ VALUE_SERIALIZER_CLASS_CONFIG”属性设置为 AvroSerializer该类来完成的。此外我们更改了ProducerFactory 和KafkaTemplate 通用类型使其指定 ElectronicsPackage 而不是 String。当我们有多个序列化的时候这个配置文件需要多次需求添加自己需要序列化的对象。
package com.yd.cyber.web.avro;/*** author zzx* creat 2020-03-11-20:23*/
Configuration
EnableKafka
public class AvroConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.producer.max-request-size})private String maxRequestSize;Beanpublic MapString, Object avroProducerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);return props;}Beanpublic ProducerFactoryString, ElectronicsPackage elProducerFactory() {return new DefaultKafkaProducerFactory(avroProducerConfigs());}Beanpublic KafkaTemplateString, ElectronicsPackage elKafkaTemplate() {return new KafkaTemplate(elProducerFactory());}
}五、通过 kafkaTemplate 发送消息
最后就是通过 Controller类调用 kafkaTemplate 的 send 方法接受一个Avro electronicsPackage对象作为输入。请注意我们还更新了 kafkaTemplate 泛型类型。
package com.yd.cyber.web.controller.aggregation;import com.yd.cyber.protocol.avro.ElectronicsPackage;
import com.yd.cyber.web.vo.ElectronicsPackageVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** p* InnoDB free: 4096 kB 前端控制器* /p** author zzx* since 2020-04-19*/
RestController
RequestMapping(/electronicsPackageTbl)
public class ElectronicsPackageController {//日誌private static final Logger log LoggerFactory.getLogger(ElectronicsPackageController.class);Resourceprivate KafkaTemplateString,ElectronicsPackage kafkaTemplate;GetMapping(/push)public void push(){ElectronicsPackageVO electronicsPackageVO new ElectronicsPackageVO();electronicsPackageVO.setElectId(9);electronicsPackageVO.setAggregatPackageCode(9);electronicsPackageVO.setCode1(9);electronicsPackageVO.setEndAllocateCode(9);electronicsPackageVO.setFrsSiteCodeType(9);electronicsPackageVO.setFrsSiteCode(9);electronicsPackageVO.setPackageNumber(9);ElectronicsPackage electronicsPackage new ElectronicsPackage();BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage);//发送消息kafkaTemplate.send(Electronics_Package,electronicsPackage);log.info(Electronics_Package TOPIC 发送成功);}
}六、从 Kafka主题消费 Avro消息反序列化
收到的消息需要反序列化为 Avro格式。为此我们创建一个 AvroDeserializer 实现该 Deserializer接口的类。该 deserialize()方法将主题名称和Byte数组作为输入然后将其解码回Avro对象。从 targetType类参数中检索需要用于解码的模式该类参数需要作为参数传递给 AvroDeserializer构造函数。
package com.yd.cyber.web.avro;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.xml.bind.DatatypeConverter;/*** avro反序列化* author fuyx* creat 2020-03-12-15:19*/
public class AvroDeserializerT extends SpecificRecordBase implements DeserializerT {//日志系统private static final Logger LOGGER LoggerFactory.getLogger(AvroDeserializer.class);protected final ClassT targetType;public AvroDeserializer(ClassT targetType) {this.targetType targetType;}Overridepublic void close() {}Overridepublic void configure(MapString, ? arg0, boolean arg1) {}Overridepublic T deserialize(String topic, byte[] data) {try {T result null;if(data null) {return null;}LOGGER.debug(data{}, DatatypeConverter.printHexBinary(data));ByteArrayInputStream in new ByteArrayInputStream(data);DatumReaderGenericRecord userDatumReader new SpecificDatumReader(targetType.newInstance().getSchema());BinaryDecoder decoder DecoderFactory.get().directBinaryDecoder(in, null);result (T) userDatumReader.read(null, decoder);LOGGER.debug(deserialized data{}, result);return result;} catch (Exception ex) {throw new SerializationException(Cant deserialize data Arrays.toString(data) from topic topic , ex);} finally {}}
}七、反序列化的配置类
我将反序列化的配置和序列化的配置都放置在 AvroConfig 配置类中。在 AvroConfig 需要被这样更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”属性。我们还更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用类型以使其指定 ElectronicsPackage 而不是 String。将 DefaultKafkaConsumerFactory 通过1个新的创造 AvroDeserializer 是需要 “User.class”作为构造函数的参数。需要使用Class? targetTypeAvroDeserializer 以将消费 byte[]对象反序列化为适当的目标对象在此示例中为 ElectronicsPackage 类。
Configuration
EnableKafka
public class AvroConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.producer.max-request-size})private String maxRequestSize;Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, avro);return props;}Beanpublic ConsumerFactoryString, ElectronicsPackage consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),new AvroDeserializer(ElectronicsPackage.class));}Beanpublic ConcurrentKafkaListenerContainerFactoryString, ElectronicsPackage kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, ElectronicsPackage factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());return factory;}}八、消费者消费消息
消费者通过 KafkaListener 监听对应的 Topic 这里需要注意的是网上直接获取对象的参数传的是对象比如这里可能需要传入 ElectronicsPackage 类但是我这样写的时候error日志总说是返回序列化的问题所以我使用 GenericRecord 对象接收也就是我反序列化中定义的对象是没有问题的。然后我将接收到的消息通过 mybatisplus 存入到数据库。
package com.zzx.cyber.web.controller.dataSource.intercompany;import com.zzx.cyber.web.service.ElectronicsPackageService;
import com.zzx.cyber.web.vo.ElectronicsPackageVO;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Controller;import javax.annotation.Resource;/*** desc:* author: zzx* creatdate 2020/4/1912:21*/
Controller
public class ElectronicsPackageConsumerController {//日志private static final Logger log LoggerFactory.getLogger(ElectronicsPackageConsumerController.class);//服务层Resourceprivate ElectronicsPackageService electronicsPackageService;/*** 扫描数据测试* param genericRecordne*/KafkaListener(topics {Electronics_Package})public void receive(GenericRecord genericRecordne) throws Exception {log.info(数据接收electronicsPackage genericRecordne.toString());//业务处理类,mybatispuls 自动生成的类ElectronicsPackageVO electronicsPackageVO new ElectronicsPackageVO();//将收的数据复制过来BeanUtils.copyProperties(genericRecordne,electronicsPackageVO);try {//落库log.info(数据入库);electronicsPackageService.save(electronicsPackageVO);} catch (Exception e) {throw new Exception(插入异常e);}}
}