0基础学做网站,重庆市建设节能中心网站,千锋前端培训班,链接生成器在线制作文章目录 一、Controller选举二、生产消息2.1、创建待发送数据2.2、创建生产者对象#xff0c;发送数据2.3、发送回调2.3.1、异步发送2.3.2、同步发送 2.4、拦截器2.5、序列化器2.6、分区器2.7、消息可靠性2.7.1、acks 02.7.2、acks 1(默认)2.7.3、acks -1或all 2.8、部分重… 文章目录 一、Controller选举二、生产消息2.1、创建待发送数据2.2、创建生产者对象发送数据2.3、发送回调2.3.1、异步发送2.3.2、同步发送 2.4、拦截器2.5、序列化器2.6、分区器2.7、消息可靠性2.7.1、acks 02.7.2、acks 1(默认)2.7.3、acks -1或all 2.8、部分重要的生产者参数2.8.1、linger.ms 2.9、Producer总结 如果想要先了解kafka的基础基础架构可以看
什么是topic、broker、分区、副本 一、Controller选举 一句话总结先到先得。 Controller是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。下面介绍一下Zookeeper的特点
一个是在ZooKeeper软件中创建节点Node创建一个Node时我们会设定这个节点是持久化创建还是临时创建。所谓的持久化创建就是Node一旦创建后会一直存在而临时创建是根据当前的客户端连接创建的临时节点Node一旦客户端连接断开那么这个临时节点Node也会被自动删除所以这样的节点称之为临时节点。ZooKeeper节点是不允许有重复的所以多个客户端创建同一个节点只能有一个创建成功。另外一个是客户端可以在ZooKeeper的节点上增加监听器用于监听节点的状态变化一旦监听的节点状态发生变化那么监听器就会触发响应实现特定监听功能。 集群中的任意一台Broker都能充当Controller的角色但是在整个集群运行过程中只能有一个Broker成为Controller。也就是说每个正常运行的Kafka集群在任何时刻都有且只有一个Controller。 最先在Zookeeper上创建临时节点/controller成功的Broker就是Controller。Controller重度依赖Zookeeper依赖zookeepr保存元数据依赖zookeeper进行服务发现。Controller大量使用Watch功能实现对集群的协调管理。如果此时作为Controller的Broker节点宕掉了。那么zookeeper的临时节点/controller就会因为会话超时而自动删除。而监控这个节点的Broker就会收到通知而向ZooKeeper发出创建/controller节点的申请一旦创建成功那么创建成功的Broker节点就成为了新的Controller。 有一种特殊的情况就是Controller节点并没有宕掉而是因为网络的抖动不稳定导致和ZooKeeper之间的会话超时那么此时整个Kafka集群就会认为之前的Controller已经下线退出从而选举出新的Controller而之前的Controller的网络又恢复了以为自己还是Controller了继续管理整个集群那么此时整个Kafka集群就有两个controller进行管理那么其他的broker就懵了不知道听谁的了这种情况我们称之为脑裂现象为了解决这个问题Kafka通过一个任期epoch:纪元的概念来解决也就是说每一个Broker当选Controller时会告诉当前Broker是第几任Controller一旦重新选举时这个任期会自动增1那么不同任期的Controller的epoch值是不同的那么旧的controller一旦发现集群中有新任controller的时候那么它就会完成退出操作清空缓存中断和broker的连接并重新加载最新的缓存让自己重新变成一个普通的Broker。
二、生产消息 整个生产者客户端由两个线程协调运行这两个线程分别为主线程和Sender线程发送线程。在主线程中由KafkaProducer创建消息然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator也称为消息收集器中。Sender线程负责从RecordAccumulator中获取消息并发送到kafka中。 RecordAccumulator主要是用来缓存消息以便Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置默认值为32MB。
我们采用Java代码通过Kafka Producer API的方式生产数据
2.1、创建待发送数据 在kafka中传递的数据我们称之为消息message或记录(record)所以Kafka发送数据前需要将待发送的数据封装为指定的数据模型ProducerRecord其实就是一个类 相关属性必须在构建数据模型时指定其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题那么Topic主题可以不存在。value就是我们需要真正传递的数据了而Key可以用于数据的分区定位。
2.2、创建生产者对象发送数据 根据前面提供的配置信息创建生产者对象通过这个生产者对象向Kafka服务器节点发送数据而具体的发送是由生产者对象创建时内部构建的多个组件实现的多个组件的关系有点类似于生产者消费者模式。 1、数据生产者 生产者对象用于对我们的数据进行必要的转换和处理将处理后的数据放入到数据收集器中类似于生产者消费者模式下的生产者。这里我们简单介绍一下内部的数据转换处理:
如果配置拦截器栈那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。因为发送的数据为key-value数据所以需要根据配置信息中的序列化对象对数据中key和value分别进行序列化处理。计算数据所发送的分区位置。将数据追加到数据收集器中。
2、数据收集器 用于收集转换我们产生的数据类似于生产者消费者模式下的缓冲区。为了优化数据的传输Kafka并不是生产一条数据就向Broker发送一条数据而是通过合并单条消息进行批量批次发送提高吞吐量减少带宽消耗。
默认情况下一个发送批次下图中的ProducerBatch的数据容量为16K不是指一个批次的最大容量就是16K而是指超过16K后当前批次就不接收新数据了每个批次是要保证数据完整性的这个可以通过参数batch.size进行改善。批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并形成一个批次。如果当前批次能容纳数据那么直接将数据追加到批次中即可如果不能容纳数据那么会产生新的批次放入到当前分区的批次队列中这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据等待发送。
3、数据发送器Sender线程 线程对象用于从收集器对象中获取数据向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中。
对网络连接来说生产者客户端是与具体的broker节点建立的连接也就是向具体的broker节点发送消息而并不关心消息属于哪一个分区而对于KafkaProducer的应用逻辑而言我们只关注向哪个分区中发送哪些消息所以在这里需要做一个应用逻辑层到网络IO层面的转换即将原本分区DequeProducerBatch保存形式转变为nodeListProducerBatch。将组合后的节点ListProducerBatch的数据封装成客户端请求发送到网络客户端对象的缓冲区由网络客户端对象通过网络发送给Broker节点。Broker节点获取客户端请求并根据请求键进行后续的数据处理向分区中增加数据。 1.数据量超过批次大小阈值可通过参数配置2.数据在缓冲区停留时间超过时间阈值可通过参数配置满足任一条件sender线程即发数据到kafka服务器。 2.3、发送回调 Kafka发送数据时可以同时传递回调对象Callback用于对数据的发送结果进行对应处理具体代码实现采用匿名类或Lambda表达式都可以。
public class KafkaProducerASynTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}});}producer.close();}
}
结果
2.3.1、异步发送 Kafka发送数据时底层的实现类似于生产者消费者模式。对应的底层会由主线程代码作为生产者向缓冲区中放数据而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。 如果Kafka通过主线程代码将一条数据放入到缓冲区后无需等待数据的后续发送过程就直接发送一下条数据的场合我们就称之为异步发送。 public class KafkaProducerASynTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}});// TODO 发送当前数据System.out.println(发送数据);}producer.close();}
}
结果
2.3.2、同步发送 Kafka发送数据时底层的实现类似于生产者消费者模式。对应的底层会由主线程代码作为生产者向缓冲区中放数据而数据发送线程会从缓冲区中获取数据进行发送。Broker接收到数据后进行后续处理。 如果Kafka通过主线程代码将一条数据放入到缓冲区后需等待数据的后续发送操作的应答状态才能发送一下条数据的场合我们就称之为同步发送。所以这里的所谓同步就是生产数据的线程需要等待发送线程的应答响应结果。 采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现的。实际上send()方法就是异步的send()方法返回的Future对象可以使调用方稍后获得发送结果。下面的示例中在执行send方法后直接调用get方法来阻塞等待kafka的响应直到消息发送成功或者发生异常。 public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}}).get();// TODO 发送当前数据System.out.println(发送数据);}producer.close();}
}
结果 通常一个kafkaProducer不会只负责发送单条消息更多的是发送多条消息在发送完这些消息之后需要调用kafkaProducer的close()方法来回收资源。close()方法会阻塞等待之前所有的发送请求完成后再关闭kafkaProducer。 消息在通过send()方法发往broker的过程中有可能经过拦截器Interceptor、序列化器Serializer和分区器Partitioner的一系列作用之后才能被真正的发往broker。拦截器一般不是必需的而序列化器是必需的。消息经过序列化后就需要确定它发往的分区如果消息ProducerRecord中指定了partition字段那么就不需要分区器的作用因为partition代表的就是所要发往的分区号。
2.4、拦截器 生产者API在数据准备好发送给Kafka服务器之前允许我们对生产的数据进行统一的处理比如校验、整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能所以大家可以根据实际的情况自行选择使用。 但是要注意这里的拦截器是可以配置多个的。执行时会按照声明顺序执行完一个后再执行下一个。并且某一个拦截器如果出现异常只会跳出当前拦截器逻辑并不会影响后续拦截器的处理。所以开发时需要将拦截器的这种处理方法考虑进去。也可以通过实现ProducerInterceptor接口来自定义拦截器。 2.5、序列化器 生产者需要使用序列化器Serializer把对象转换为字节数组才能通过网络发送给kafka而在对侧消费者需要使用反序列化器Desirializer把从kafka获得的字节数组转换为相应的对象。在上面的代码例子中为了方便消息的key和value都使用了字符串对应程序中的序列化器也使用了客户端自带的StringSerializer除了使用String类型的序列化器还有其他类型的序列化器。当然也可以通过实现Serializer接口来自定义序列化器。
2.6、分区器 Kafka中Topic是对数据逻辑上的分类而Partition才是数据真正存储的物理位置。所以在生产数据时如果只是指定Topic的名称其实Kafka是不知道将数据发送到哪一个Broker节点的。我们可以在构建数据传递Topic参数的同时也可以指定数据存储的分区编号。 指定分区传递数据是没有任何问题的。Kafka会进行基本简单的校验比如是否为空是否小于0之类的但是你的分区是否存在就无法判断了所以需要从Kafka中获取集群元数据信息此时会因为长时间获取不到元数据信息而出现超时异常。所以如果不能确定分区编号范围的情况不指定分区还是一个不错的选择。 如果不指定分区Kafka会根据集群元数据中的主题分区来通过算法来计算分区编号并设定当然还可以通过实现Partitioner来自定义分区器kafka分区的选择流程如下
如果指定了分区直接使用。如果指定了自己的分区器通过分区器计算分区编号如果有效直接使用。如果指定了数据Key且使用Key选择分区的场合采用murmur2非加密散列算法类似于hash计算数据Key序列化后的值的散列值然后对主题分区数量模运算取余最后的结果就是分区编号。如果未指定数据Key或不使用Key选择分区那么Kafka会采用优化后的粘性分区策略进行分区选择。
2.7、消息可靠性 对于生产者发送的数据我们有的时候是不关心数据是否已经发送成功的我们只要发送就可以了。在这种场景中消息可能会因为某些故障或问题导致丢失我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失但是在某些需要高吞吐低可靠的系统场景中这种方式也是可以接受的甚至是必须的。 但是在更多的场景中我们是需要确定数据是否已经发送成功了且Kafka正确接收到数据的也就是要保证数据不丢失这就是所谓的消息可靠性保证。 而这个确定的过程一般是通过Kafka给我们返回的响应确认结果Acknowledgement来决定的这里的响应确认结果我们也可以简称为ACK应答。根据场景Kafka提供了3种应答处理可以通过配置对象进行配置。acks是生产者客户端中一个非常重要的参数它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值都是字符串类型。 2.7.1、acks 0 当生产数据时生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候Kafka就对当前的数据请求进行了响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。 通过图形明显可以看出这种应答方式数据已经通过网络给Kafka发送了但这其实并不能保证Kafka能正确地接收到数据在传输过程中如果网络出现了问题那么数据就丢失了。也就是说这种应答确认的方式数据的可靠性是无法保证的。不过相反因为无需等待Kafka服务节点的确认通信效率倒是比较高的也就是系统吞吐量会非常高。
2.7.2、acks 1(默认) 当生产数据时Kafka Leader副本将数据接收到并写入到了日志文件保存到磁盘后就会对当前的数据请求进行响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。 通过图形可以看出这种应答方式数据已经存储到了分区Leader副本中那么数据相对来讲就比较安全了也就是可靠性比较高。之所以说相对来讲比较安全就是因为现在只有一个节点存储了数据而数据并没有来得及进行备份到follower副本那么一旦当前存储数据的broker节点出现了故障数据也依然会丢失。
2.7.3、acks -1或all 当生产数据时Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后再对当前的数据请求进行响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。 通过图形可以看出这种应答方式数据已经同时存储到了分区Leader副本和follower副本中那么数据已经非常安全了可靠性也是最高的。此时如果Leader副本出现了故障那么follower副本能够开始起作用因为数据已经存储了所以数据不会丢失。
不过这里需要注意如果假设我们的分区有5个follower副本编号为12345 但是此时只有3个副本处于和Leader副本之间处于数据同步状态那么此时分区就存在一个同步副本列表我们简称为ISR。此时Kafka只要保证ISR中所有的4个副本接收到了数据就可以对数据请求进行响应了。无需5个副本全部收到数据。
2.8、部分重要的生产者参数
2.8.1、linger.ms 这个参数用来指定生产者发送ProducerBatch之前等待更多消息ProducerRecord加入ProducerBatch的时间默认值为0。生产者客户端会在ProducerBatch被填满或等待时间超过linger.ms值时发送出去。增大这个参数的值会增加消息的延迟但是同事能提升一定的吞吐量。 还有很多其他重要的生产者参数可以参考相关帖子或书籍。 2.9、Producer总结 对于KafkaProducer而言它是线程安全的我们可以在多线程的环境中复用它但是对于消费者客户端KafkaConsumer而言它是非线程安全的因为它具备了状态具体怎么使用可以参见后面的文章。