网站建设报价单格式,互联网公司有哪些部门,网站建设三剑客,中国做网站的公司排名SpringBoot 整合 Avro 与 Kafka 详解
在大数据处理和实时数据流场景中#xff0c;Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台#xff0c;能够高效地处理大量数据#xff0c;而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数…SpringBoot 整合 Avro 与 Kafka 详解
在大数据处理和实时数据流场景中Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台能够高效地处理大量数据而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合并通过 Spring Boot 进行整合可以构建出高效、可扩展的实时数据处理系统。
一、环境准备
在开始整合之前需要准备好以下环境
Java确保已经安装了 JDK推荐使用 JDK 8 或更高版本。Maven用于管理项目的依赖和构建过程。Spring Boot作为项目的框架推荐使用较新的版本如 Spring Boot 2.x。Kafka确保 Kafka 已经安装并运行可以使用 Docker 部署 Kafka 集群。AvroAvro 依赖 JSON 定义的架构来序列化数据。
二、项目结构
一个典型的 Spring Boot 项目结构可能如下
spring-boot-kafka-avro
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ ├── SpringBootKafkaAvroApplication.java
│ │ │ ├── config
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ └── model
│ │ │ └── ElectronicsPackage.java (由 Avro 自动生成)
│ │ ├── resources
│ │ │ ├── application.properties
│ │ │ └── avro
│ │ │ └── electronicsPackage.avsc (Avro 架构文件)
│ └── test
│ └── java
│ └── com
│ └── example
│ └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml三、添加依赖
在 pom.xml 文件中添加必要的依赖
dependencies!-- Spring Boot Starter --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!-- Spring Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.9.13/version !-- 根据需要选择合适的版本 --/dependency!-- Avro --dependencygroupIdorg.apache.avro/groupIdartifactIdavro/artifactIdversion1.11.0/version !-- 根据需要选择合适的版本 --/dependency!-- Avro Maven Plugin --plugingroupIdorg.apache.avro/groupIdartifactIdavro-maven-plugin/artifactIdversion${avro.version}/versionexecutionsexecutionphasegenerate-sources/phasegoalsgoalschema/goal/goalsconfigurationsourceDirectory${project.basedir}/src/main/resources/avro//sourceDirectoryoutputDirectory${project.build.directory}/generated/avro/outputDirectory/configuration/execution/executions/plugin
/dependencies四、定义 Avro 架构
在 src/main/resources/avro/ 目录下创建一个 Avro 架构文件 electronicsPackage.avsc
{namespace: com.example.model,type: record,name: ElectronicsPackage,fields: [{name: package_number, type: [string, null], default: null},{name: frs_site_code, type: [string, null], default: null},{name: frs_site_code_type, type: [string, null], default: null}]
}这个架构文件定义了 ElectronicsPackage 类包括三个字段package_number、frs_site_code 和 frs_site_code_type。
五、生成 Avro 类
运行 Maven 构建过程Avro Maven 插件会根据 electronicsPackage.avsc 文件生成相应的 Java 类 ElectronicsPackage.java。
六、配置 Kafka
在 application.properties 文件中配置 Kafka 的相关属性
spring.kafka.bootstrap-serverslocalhost:9092
spring.kafka.consumer.group-idmy-group
spring.kafka.consumer.auto-offset-resetearliest
spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializercom.example.config.AvroSerializer
spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializercom.example.config.AvroDeserializer注意这里指定了自定义的 AvroSerializer 和 AvroDeserializer 类。
七、实现 Avro 序列化器和反序列化器
创建 AvroSerializer 和 AvroDeserializer 类用于 Avro 数据的序列化和反序列化。
// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializerT extends SpecificRecord implements SerializerT {private final DatumWriterT writer;public AvroSerializer(ClassT type) {this.writer new SpecificDatumWriter(type);}Overridepublic byte[] serialize(String topic, T data) {if (data null) {return null;}ByteArrayOutputStream out new ByteArrayOutputStream();Encoder encoder EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}Overridepublic void configure(MapString, ? configs, boolean isKey) {// No-op}Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializerT extends SpecificRecord implements DeserializerT {private final ClassT type;private final DatumReaderT reader;public AvroDeserializer(ClassT type) {this.type type;this.reader new SpecificDatumReader(type);}Overridepublic T deserialize(String topic, byte[] data) {if (data null) {return null;}ByteArrayInputStream in new ByteArrayInputStream(data);Decoder decoder DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}Overridepublic void configure(MapString, ? configs, boolean isKey) {// No-op