用什么做网站最简单,从化网站开发,wordpress留言标签,门户网站有哪些欢迎来到啾啾的博客#x1f431;。 记录学习点滴。分享工作思考和实用技巧#xff0c;偶尔也分享一些杂谈#x1f4ac;。 有很多很多不足的地方#xff0c;欢迎评论交流#xff0c;感谢您的阅读和评论#x1f604;。 目录 1 引言2 消息ProducerRecord2.1 分区器 1 引言
… 欢迎来到啾啾的博客。 记录学习点滴。分享工作思考和实用技巧偶尔也分享一些杂谈。 有很多很多不足的地方欢迎评论交流感谢您的阅读和评论。 目录 1 引言2 消息ProducerRecord2.1 分区器 1 引言
在之前的Kafka篇章中我们已经了解到Kafka Producer内部会有一个缓冲区生产者通过批量发送消息的方式提升总体的吞吐。
批量发送在很多场景中都很常见Kafka是如何实现的 首先我们需要理解消息。
2 消息ProducerRecord
Kafka消息的封装如下 2.1 分区器 Kafka Producer可以指定消息发送到哪个Topic也可以指定到哪个Partition。没有指定Partition时Kafka Producer会使用分区器 Partitioner来决定消息应该到哪个partition。 可以看一下分区器 Partitioner 的方法
参数如下 ProducerRecordK, V record要发送的 Kafka 消息记录包含主题、键、值、分区等信息。 byte[] serializedKey消息键的序列化字节数组。 byte[] serializedValue消息值的序列化字节数组。 Cluster clusterKafka 集群的元数据信息包含主题、分区等信息。
其中serializedKey和serializedValue是分别用了同一个序列化类的不同的序列化对象来做的 有意思的是序列化方法传入的topic与headers信息在Kafka的默认序列化中是没有被使用的。
很显然这两个参数topic和headers是为了序列化的扩展性预留的设计。 使用topic参数可以为不同的topic准备不同的序列化策略比如加密。
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class TopicAwareSerializer implements SerializerString {Overridepublic void configure(MapString, ? configs, boolean isKey) {// 配置逻辑}Overridepublic byte[] serialize(String topic, String data) {if (sensitive-topic.equals(topic)) {// 对敏感主题的数据进行特殊处理String encryptedData encrypted: data;return encryptedData.getBytes(StandardCharsets.UTF_8);}return data.getBytes(StandardCharsets.UTF_8);}Overridepublic void close() {// 关闭逻辑}
}使用headers参数比如编码格式、版本号等也可以做一些定制化操作。
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class HeaderAwareSerializer implements SerializerString {Overridepublic void configure(MapString, ? configs, boolean isKey) {// 配置逻辑}Overridepublic byte[] serialize(String topic, Headers headers, String data) {Charset charset StandardCharsets.UTF_8;if (headers.lastHeader(charset) ! null) {String charsetName new String(headers.lastHeader(charset).value(), StandardCharsets.UTF_8);charset Charset.forName(charsetName);}return data.getBytes(charset);}Overridepublic byte[] serialize(String topic, String data) {return serialize(topic, null, data);}Overridepublic void close() {// 关闭逻辑}
}需要注意的是key|value.serializer都必须被设置为实现了org.apache.kafka.common.serialization.Serializer接口的类。
没有指定分区partition时分区器partitioner.partition方法有三种实现。Kafka 在 Producer 中默认不配置分区器使用的是 DefaultPartitioner。你可以在创建 KafkaProducer 时通过配置 partitioner.class 属性来指定使用的分区器若不指定就会使用默认的 DefaultPartitioner。 DefaultPartitioner 用途这是 Kafka 默认的分区器。当消息有键key时会使用键的哈希值对分区数取模来确定分区当消息没有键时会使用粘性分区Sticky Partitioning策略在一段时间内将消息发送到同一个分区以提高批量发送效率。 使用场景大多数常规业务场景无需特殊分区策略时使用。 RoundRobinPartitioner 用途采用轮询的方式依次将消息发送到各个分区确保消息均匀分布在所有分区上。 使用场景需要消息均匀分布且对消息顺序没有严格要求的场景。 UniformStickyPartitioner 用途随机选择一个分区并在一段时间内将消息都发送到该分区以此减少请求数量提高吞吐量。 使用场景对吞吐量要求较高且对消息顺序没有严格要求的场景。 所以要指定好Partition。