免费做推广的网站,免费wordpress主题分享,做收款二维码的网站,潍坊住房和城乡建设局网站1 生产者
生产逻辑
配置生产者客户端参数及创建相应的生产者实例。构建待发送的消息。发送消息关闭实列
参数说明
bootstrap.servers #xff1a;用来指定生产者客户端链接Kafka集群搜需要的broker地址清单#xff0c;具体格式 host1:port1,host2:port2,可以设置一个或多…1 生产者
生产逻辑
配置生产者客户端参数及创建相应的生产者实例。构建待发送的消息。发送消息关闭实列
参数说明
bootstrap.servers 用来指定生产者客户端链接Kafka集群搜需要的broker地址清单具体格式 host1:port1,host2:port2,可以设置一个或多个地址中间号分割参数默认 空串。这里要注意并不需要配置所有的broker地址应为生产者会在broker中找到其他的broker地址但是建议配置两个以上当其中一个broker宕机时还可以通过另外一个工作。key.serializer和value.serializerbroker端接受的消息必须以字节数组的形式存在。client.id : 默认 “” 用来设置KafkaProducer对应的客户端idmax.block.ms默认值 60000 用来控制KafkaProducer 中send()方法和partitionsFor()方法的阻塞时间partitioner.class用来指定分区器enable.idempotence:默认值 false 是否开启幂等性interceptor.classes 用来设置生产者拦截器max.in.flight.requests.per.connection5 限制每个连接最多缓存的请求数metadata.max.age.ms 300000 5分钟 如果在这个时间内元数据没有更新的话就强制更新。transactional.idnull 设置事务id 必须唯一batch.size 16384(16KB) 生产者客户端中用于缓存消息的缓冲区大小。
序列化器Serializer
生产者发送消息到kafka是需要将对象序列化城流才能访问到kafka消费者需要把流反序列化 才能进行 消费。
分区器
消息在通过send()方法发送到broker的过程中有可能需要经过拦截器、序列化器和分区器partitioner的一系列作用之后才能被真正的发往broker。拦截器一般不是必须的而序列化器时必须的必须的。消息经过序列化之后就需要确定它发送的分区如果消息ProducerRecord中指定了partition字段那么就不需要分区器的作用因为partition代表的就是所要发往的分区。 分区器时通过kay来计算partition的值分区器的作用就是为消息分配分区。 kafka的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
生产者拦截器Interceptor
生产者拦截器主要用来在消息发送前做一些准备工作如按照规则过滤不符合条件的消息修改消息等也可以用来做一些定制化的需求kafkaProducer在将消息序列化和计算分区之前会调用拦截器的onSend()方法来对消息进行相应的定制化
原理分析 主线程中由KafkaPartition创建消息通过拦截器通过序列化器通过分区器到达消息累加器RecordAccumulator主要是用来收集消息方便 Sender可以批量发送