自适应网站如何做mip,菏泽做网站推广,免费做情网站,深圳外贸公司多的区大数据技术之 Kafka 文章目录 大数据技术之 Kafka第 1 章 Kafka 概述1.1 定义1.2 消息队列1.2.1 传统消息队列的应用场景1.2.2 消息队列的两种模式 1.3 Kafka 基础架构 第 2 章 Kafka 快速入门2.1 安装部署2.1.1 集群规划2.1.2 集群部署2.1.3 集群启停脚本 2.2 Kafka 命令行操作…大数据技术之 Kafka 文章目录 大数据技术之 Kafka第 1 章 Kafka 概述1.1 定义1.2 消息队列1.2.1 传统消息队列的应用场景1.2.2 消息队列的两种模式 1.3 Kafka 基础架构 第 2 章 Kafka 快速入门2.1 安装部署2.1.1 集群规划2.1.2 集群部署2.1.3 集群启停脚本 2.2 Kafka 命令行操作2.2.1 主题命令行操作2.2.2 生产者命令行操作2.2.3 消费者命令行操作 第 3 章 Kafka 生产者3.1 生产者消息发送流程3.1.1 发送原理3.1.2 生产者重要参数列表 3.2 异步发送 API3.2.1 普通异步发送3.2.2 带回调函数的异步发送 3.3 同步发送 API3.4 生产者分区3.4.1 分区好处3.4.2 生产者发送消息的分区策略3.4.3 自定义分区器 3.5 生产经验——生产者如何提高吞吐量3.6 生产经验——数据可靠性3.7 生产经验——数据去重3.7.1 数据传递语义3.7.2 幂等性3.7.3 生产者事务 3.8 生产经验——数据有序3.9 生产经验——数据乱序 第 4 章 Kafka Broker4.1 Kafka Broker 工作流程4.1.1 Zookeeper 存储的 Kafka 信息4.1.2 Kafka Broker 总体工作流程4.1.3 Broker 重要参数 4.2 生产经验——节点服役和退役4.2.1 服役新节点4.2.2 退役旧节点 4.3 Kafka 副本4.3.1 副本基本信息4.3.2 Leader 选举流程4.3.3 Leader 和 Follower 故障处理细节4.3.4 分区副本分配4.3.5 生产经验——手动调整分区副本存储4.3.6 生产经验——Leader Partition 负载平衡4.3.7 生产经验——增加副本因子 4.4 文件存储4.4.1 文件存储机制4.4.2 文件清理策略 4.5 高效读写数据 第 5 章 Kafka 消费者5.1 Kafka 消费方式5.2 Kafka 消费者工作流程5.2.1 消费者总体工作流程5.2.2 消费者组原理5.2.3 消费者重要参数 5.3 消费者 API5.3.1 独立消费者案例订阅主题5.3.2 独立消费者案例订阅分区5.3.3 消费者组案例 5.4 生产经验——分区的分配以及再平衡5.4.1 Range 以及再平衡5.4.2 RoundRobin 以及再平衡5.4.3 Sticky 以及再平衡 5.5 offset 位移5.5.1 offset 的默认维护位置5.5.2 自动提交 offset5.5.3 手动提交 offset5.5.4 指定 Offset 消费5.5.5 指定时间消费5.5.6 漏消费和重复消费 5.6 生产经验——消费者事务5.7 生产经验——数据积压消费者如何提高吞吐量 第 6 章 Kafka-Eagle 监控6.1 MySQL 环境准备6.2 Kafka 环境准备6.3 Kafka-Eagle 安装6.4 Kafka-Eagle 页面操作 第 7 章 Kafka-Kraft 模式7.1 Kafka-Kraft 架构7.2 Kafka-Kraft 集群部署7.3 Kafka-Kraft 集群启动停止脚本 第 8 章 集成SpringBoot8.1 SpringBoot 生产者8.2 SpringBoot 消费者 第 1 章 Kafka 概述
1.1 定义 1.2 消息队列
目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有 Kafka 、 ActiveMQ 、 RabbitMQ 、RocketMQ 等。
在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
1.2.1 传统消息队列的应用场景
传统的消息队列的主要应用场景包括缓存/消峰、解耦和异步通信。
缓冲/消峰 解耦
解耦允许你独立的扩展或修改两边的处理过程只要确保它们遵守同样的接口约束。 异步通信
异步通信允许用户把一个消息放入队列但并不立即处理它然后在需要的时候再去处理它们。 1.2.2 消息队列的两种模式
1点对点模式 消费者主动拉取数据消息收到后清除消息
2发布/订阅模式
可以有多个topic主题浏览、点赞、收藏、评论等消费者消费数据之后不删除数据每个消费者相互独立都可以消费到数据 1.3 Kafka 基础架构 1Producer消息生产者就是向 Kafka broker 发消息的客户端。 2Consumer消息消费者向 Kafka broker 取消息的客户端。 3Consumer GroupCG消费者组由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。
4Broker一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
5Topic可以理解为一个队列生产者和消费者面向的都是一个 topic。
6Partition为了实现扩展性一个非常大的 topic 可以分布到多个 broker即服务器上一个 topic 可以分为多个 partition每个 partition 是一个有序的队列。
7Replica副本。一个 topic 的每个分区都有若干个副本一个 Leader 和若干个Follower。
8Leader每个分区多个副本的“主”生产者发送数据的对象以及消费者消费数据的对象都是 Leader。
9Follower每个分区多个副本中的“从”实时从 Leader 中同步数据保持和Leader 数据的同步。Leader 发生故障时某个 Follower 会成为新的 Leader。
第 2 章 Kafka 快速入门
2.1 安装部署
2.1.1 集群规划
hadoop102hadoop103hadoop104zkzkzkkafkakafkakafka
2.1.2 集群部署
0官方下载地址http://kafka.apache.org/downloads.html
1解压安装包
2.12表示scale语言版本3.0.0表示kafka的版本 [zhaohadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/2修改解压后的文件名称
[zhaohadoop102 module]$ mv kafka_2.12-3.0.0/ kafka 3进入到/opt/module/kafka 目录修改配置文件
[zhaohadoop102 kafka]$ cd config/
[zhaohadoop102 config]$ vim server.properties输入以下内容
#broker 的全局唯一编号不能重复只能是数字。
broker.id0
。。。。。。
#kafka 运行日志(数据)存放的路径路径不需要提前创建kafka 自动帮你创建可以配置多个磁盘路径路径与路径之间可以用分隔
log.dirs/opt/module/kafka/datas
。。。。。
#配置连接 Zookeeper 集群地址在 zk 根目录下创建/kafka方便管理
zookeeper.connecthadoop102:2181,hadoop103:2181,hadoop104:2181/kafka4分发安装包
[zhaohadoop102 module]$ xsync kafka/5分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id1、broker.id2
注broker.id 不得重复整个集群中唯一。
[zhaohadoop103 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id1[zhaohadoop104 module]$ vim kafka/config/server.properties
修改:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id26配置环境变量
1在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[zhaohadoop102 module]$ sudo vim /etc/profile.d/my_env.sh增加如下内容
#KAFKA_HOME
export KAFKA_HOME/opt/module/kafka
export PATH$PATH:$KAFKA_HOME/bin2刷新一下环境变量。
[zhaohadoop102 module]$ source /etc/profile3分发环境变量文件到其他节点并 source。
[zhaohadoop102 module]$ sudo /home/zhao/bin/xsync /etc/profile.d/my_env.sh
[zhaohadoop103 module]$ source /etc/profile
[zhaohadoop104 module]$ source /etc/profile7启动集群
1先启动 Zookeeper 集群然后启动 Kafka。
[zhaohadoop102 kafka]$ zk.sh start2依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka。
[zhaohadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[zhaohadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[zhaohadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties注意配置文件的路径要能够到 server.properties。
8关闭集群
[zhaohadoop102 kafka]$ bin/kafka-server-stop.sh
[zhaohadoop103 kafka]$ bin/kafka-server-stop.sh
[zhaohadoop104 kafka]$ bin/kafka-server-stop.sh2.1.3 集群启停脚本
1在/home/zhao/bin 目录下创建文件 kf.sh 脚本文件
[zhaohadoop102 bin]$ vim kf.sh脚本如下
#! /bin/bash
case $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-stop.sh done
};;
esac2添加执行权限
[zhaohadoop102 bin]$ chmod x kf.sh3启动集群命令
[zhaohadoop102 ~]$ kf.sh start4停止集群命令s
[zhaohadoop102 ~]$ kf.sh stop注意停止 Kafka 集群时一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息Zookeeper 集群一旦先停止Kafka 集群就没有办法再获取停止进程的信息只能手动杀死 Kafka 进程了。 2.2 Kafka 命令行操作
Kafka 基础架构 2.2.1 主题命令行操作
1查看操作主题命令参数
[zhaohadoop102 kafka]$ bin/kafka-topics.sh参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号。–topic String: topic操作的 topic 名称。–create创建主题。–delete删除主题。–alter修改主题。–list查看所有主题。–describe查看主题详细描述。–partitions Integer: # of partitions设置分区数。–replication-factorInteger: replication factor设置分区副本。–config String: namevalue更新系统默认的配置。
2查看当前服务器中的所有 topic
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list3创建 first topic
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first选项说明
–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数
4查看 first 主题的详情
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first5修改分区数注意分区数只能增加不能减少
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 36再次查看 first 主题的详情
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first7删除 topic自行演示
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first2.2.2 生产者命令行操作
1查看操作生产者命令参数
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号。–topic String: topic操作的 topic 名称。
2发送消息
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello world
zhao zhao2.2.3 消费者命令行操作
1查看操作消费者命令参数
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号。–topic String: topic操作的 topic 名称。–from-beginning从头开始消费。–group String: consumer group id指定消费者组名称。
2消费消息
1消费 first 主题中的数据。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first参数描述–bootstrap-server String: server toconnect to连接的 Kafka Broker 主机名称和端口号。–topic String: topic操作的 topic 名称。–from-beginning从头开始消费。–group String: consumer group id指定消费者组名称。
2消费消息
1消费 first 主题中的数据。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first2把主题中所有的数据都读取出来包括历史数据。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first第 3 章 Kafka 生产者
3.1 生产者消息发送流程
3.1.1 发送原理
在消息发送的过程中涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
发送流程 3.1.2 生产者重要参数列表
参数名称描述bootstrap.servers生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092可以设置 1 个或者多个设置多个为了防止某个机器挂掉连不上集群中间用逗号隔开。注意这里并非需要所有的 broker 地址因为生产者从给定的 broker里查找到其他 broker 信息。key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。buffer.memoryRecordAccumulator 缓冲区总大小默认 32m。batch.size缓冲区一批数据最大值默认 16k。适当增加该值可以提高吞吐量但是如果该值设置太大会导致数据传输延迟增加。linger.mss如果数据迟迟未达到 batch.sizesender 等待linger.time之后就会发送数据。单位 ms默认值是 0ms表示没有延迟。生产环境建议该值大小为 5-100ms 之间。acks0生产者发送过来的数据不需要等数据落盘应答。1生产者发送过来的数据Leader 收到数据后应答。-1all生产者发送过来的数据Leader和 isr 队列里面的所有节点收齐数据后应答。默认值是-1-1 和all 是等价的。max.in.flight.requests.per.connection允许最多没有返回 ack 的次数默认为 5开启幂等性要保证该值是 1-5 的数字。retries当消息发送出现错误的时候系统会重发消息。retries表示重试次数。默认是 int 最大值2147483647。如果设置了重试还想保证消息的有序性需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION1否则在重试此失败消息的时候其他的消息可能发送成功了。retry.backoff.ms两次重试之间的时间间隔默认是 100ms。enable.idempotence是否开启幂等性默认 true开启幂等性。compression.type生产者发送的所有数据的压缩方式。默认是 none也就是不压缩。支持压缩类型none、gzip、snappy、lz4 和 zstd。
3.2 异步发送 API
3.2.1 普通异步发送
1需求创建 Kafka 生产者采用异步的方式发送到 Kafka Broker
异步发送流程 2代码编写
1创建工程 kafka
2导入依赖
dependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.0.0/version/dependency
/dependencies3创建包名com.zhao.kafka.producer
4编写不带回调函数的 API 代码
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 3. 创建 kafka 生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first,zhao i));}// 5. 关闭资源kafkaProducer.close();}
}测试
①在 hadoop102 上开启 Kafka 消费者。
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstzhao 0
zhao 1
zhao 2
zhao 3
zhao 43.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。
带回调函数的异步发送流程 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 3. 创建 kafka 生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first,zhao i), new Callback(){// 该方法在 Producer 收到 ack 时调用为异步调用Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {// 没有异常,输出信息到控制台System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});}// 5. 关闭资源kafkaProducer.close();}
}测试
①在 hadoop102 上开启 Kafka 消费者。
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstzhao 0
zhao 1
zhao 2
zhao 3
zhao 4③在 IDEA 控制台观察回调信息。
主题first-分区0
主题first-分区0
主题first-分区1
主题first-分区1
主题first-分区13.3 同步发送 API
同步发送流程 只需在异步发送的基础上再调用一下 get()方法即可。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {public static void main(String[] args) throws InterruptedException, ExecutionException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 10; i) {// 异步发送 默认//kafkaProducer.send(new ProducerRecord(first,kafka i));// 同步发送kafkaProducer.send(new ProducerRecord(first,kafka i)).get();}// 5. 关闭资源kafkaProducer.close();}
}测试
①在 hadoop102 上开启 Kafka 消费者。
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstzhao 0
zhao 1
zhao 2
zhao 3
zhao 43.4 生产者分区
3.4.1 分区好处
1便于合理使用存储资源每个Partition在一个Broker上存储可以把海量的数据按照分区切割成一 块一块数据存储在多台Broker上。合理控制分区的任务可以实现负载均衡的效果。 2提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行消费数据。 3.4.2 生产者发送消息的分区策略
1默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl n全局查找 DefaultPartitioner。
/**
* The default partitioning strategy:
* ul
* liIf a partition is specified in the record, use it
* liIf no partition is specified but a key is present choose a partition based on a hash of the key
* liIf no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}2案例一
将数据发往指定 partition 的情况下例如将所有数据发往分区 1 中。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducerString,String kafkaProducer new KafkaProducer(properties);for (int i 0; i 5; i) {// 指定数据发送到 1 号分区key 为空IDEA 中 ctrl p 查看参数kafkaProducer.send(new ProducerRecord(first, 1,,zhao i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata,Exception e) {if (e null){System.out.println(主题metadata.topic() - 分区 metadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}测试
①在 hadoop102 上开启 Kafka 消费者
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[zhaohadoop102 kafka]$bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstzhao 0
zhao 1
zhao 2
zhao 3
zhao 4③在 IDEA 控制台观察回调信息。
主题first-分区1
主题first-分区1
主题first-分区1
主题first-分区1
主题first-分区13案例二
没有指明 partition 值但有 key 的情况下将 key 的 hash 值与 topic 的 partition 数进行取 余得到 partition 值。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 3. 创建 kafka 生产者对象KafkaProducerString,String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {// 依次指定 key 值为 a,b,f 数据 key 的 hash 值与 3 个分区求余分别发往 1、2、0kafkaProducer.send(new ProducerRecord(first,a,zhao i), new Callback() {// 该方法在 Producer 收到 ack 时调用为异步调用Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {// 没有异常,输出信息到控制台System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});}// 5. 关闭资源kafkaProducer.close();}
}测试
①keya时在控制台查看结果。
主题first-分区1
主题first-分区1
主题first-分区1
主题first-分区1
主题first-分区1②keyb时在控制台查看结果。
主题first-分区2
主题first-分区2
主题first-分区2
主题first-分区2
主题first-分区2③keyf时在控制台查看结果。
主题first-分区0
主题first-分区0
主题first-分区0
主题first-分区0
主题first-分区03.4.3 自定义分区器
如果研发人员可以根据企业需求自己重新实现分区器。
1需求
例如我们实现一个分区器实现发送过来的数据中如果包含 zhao就发往 0 号分区不包含 zhao就发往 1 号分区。
2实现步骤
1定义类实现 Partitioner 接口。
2重写 partition()方法。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区* param topic 主题* param key 消息的 key* param keyBytes 消息的 key 序列化后的字节数组* param value 消息的 value* param valueBytes 消息的 value 序列化后的字节数组* param cluster 集群元数据可以查看分区信息* return*/Overridepublic int partition(String topic,Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue value.toString();// 创建 partitionint partition;// 判断消息是否包含 zhaoif (msgValue.contains(zhao)){partition 0;}else {partition 1;}// 返回分区号return partition;}// 关闭资源Overridepublic void close() {}// 配置方法Overridepublic void configure(MapString, ? configs) {}
}3使用分区器的方法在生产者的配置中添加分区器参数。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,com.zhao.kafka.producer.MyPartitioner);KafkaProducerString,String kafkaProducer new KafkaProducerString, String(properties);for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first,zhao i), new Callback(){Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {exception.printStackTrace();}}});}kafkaProducer.close();}
}4测试
①在 hadoop102 上开启 Kafka 消费者。ss
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 控制台观察回调信息。
主题first-分区0
主题first-分区0
主题first-分区0
主题first-分区0
主题first-分区03.5 生产经验——生产者如何提高吞吐量 package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// batch.size批次大小默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms等待时间默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator缓冲区大小默认 32Mbuffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type压缩默认 none可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first, zhao i));}// 5. 关闭资源kafkaProducer.close();}
}测试
①在 hadoop102 上开启 Kafka 消费者。
[zhaohadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first②在 IDEA 中执行代码观察 hadoop102 控制台中是否接收到消息。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firstzhao 0
zhao 1
zhao 2
zhao 3
zhao 43.6 生产经验——数据可靠性
0回顾发送流程 1ack 应答原理
ACK应答级别 2代码配置
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, all);// 重试次数 retries默认是 int 最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(first, zhao i));}// 5. 关闭资源kafkaProducer.close();}
}3.7 生产经验——数据去重
3.7.1 数据传递语义 3.7.2 幂等性
1幂等性原理 2如何使用幂等性
开启参数enable.idempotence默认为 truefalse 关闭。
3.7.3 生产者事务
1Kafka 事务原理 2Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();// 2 开启事务
void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量主要用于消费者
void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets,String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;// 5 放弃事务类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;3单个 Producer使用事务保证消息的仅一次发送
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);// 设置事务 id必须事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transaction_id_0);// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息for (int i 0; i 5; i) {// 发送消息kafkaProducer.send(new ProducerRecord(first,zhao i));}// int i 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}3.8 生产经验——数据有序 3.9 生产经验——数据乱序 第 4 章 Kafka Broker
4.1 Kafka Broker 工作流程
4.1.1 Zookeeper 存储的 Kafka 信息
1启动 Zookeeper 客户端。
[zhaohadoop102 zookeeper-3.5.7]$ bin/zkCli.sh2通过 ls 命令可以查看 kafka 相关信息。
[zk: localhost:2181(CONNECTED) 2] ls /kafkaZookeeper中存储的Kafka 信息 zookeeper可视化工具prettyZoo 4.1.2 Kafka Broker 总体工作流程 1模拟 Kafka 上下线Zookeeper 中数据变化
1查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids[0, 1, 2]2查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller{version:1,brokerid:0,timestamp:1637292471777}3查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state{controller_epoch:24,leader:0,version:1,leader_epoch:18,
isr:[0,1,2]}4停止 hadoop104 上的 kafka。
[zhaohadoop104 kafka]$ bin/kafka-server-stop.sh5再次查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
[0, 1]6再次查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller{version:1,brokerid:0,timestamp:1637292471777}7再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state{controller_epoch:24,leader:0,version:1,leader_epoch:18,isr:[0,1]}8启动 hadoop104 上的 kafka。
[zhaohadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties9再次观察1、2、3步骤中的内容。
4.1.3 Broker 重要参数
参数名称描述replica.lag.time.max.msISR 中如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值默认 30s。auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分 成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。log.retention.hoursKafka 中数据保存的时间默认 7 天。log.retention.minutesKafka 中数据保存的时间分钟级别默认关闭。log.retention.msKafka 中数据保存的时间毫秒级别默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔默认是 5 分钟。log.retention.bytes默认等于-1表示无穷大。超过设置的所有日志总大小删除最早的 segment。log.cleanup.policy默认是 delete表示所有数据启用删除策略如果设置值为 compact表示所有数据启用压缩策略。num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.replica.fetchers副本拉取线程数这个参数占总核数的 50%的 1/3num.network.threads默认是 3。数据传输线程数这个参数占总核数的50%的 2/3 。log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。
4.2 生产经验——节点服役和退役
4.2.1 服役新节点
1新节点准备
1关闭 hadoop104并右键执行克隆操作。
2开启 hadoop105并修改 IP 地址。
[roothadoop104 ~] # vim /etc/sysconfig/network-scripts/ifcfg-ens33DEVICEens33
TYPEEthernet
ONBOOTyes
BOOTPROTOstatic
NAMEens33
IPADDR192.168.10.105
PREFIX24
GATEWAY192.168.10.2
DNS1192.168.10.23在 hadoop105 上修改主机名称为 hadoop105。
[roothadoop104 ~]# vim /etc/hostname
hadoop1054重新启动 hadoop104、hadoop105。
5修改 haodoop105 中 kafka 的 broker.id 为 3。
6删除 hadoop105 中 kafka 下的 datas 和 logs。
[zhaohadoop105 kafka]$ rm -rf datas/* logs/*7启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群。
[zhaohadoop102 ~]$ zk.sh start
[zhaohadoop102 ~]$ kf.sh start8单独启动 hadoop105 中的 kafka。
[zhaohadoop105 kafka]$bin/kafka-server-start.sh -daemon ./config/server.properties2执行负载均衡操作
1创建一个要均衡的主题。
[zhaohadoop102 kafka]$ vim topics-to-move.json{topics: [{topic: first}],version: 1
}2生成一个负载均衡的计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 --generateCurrent partition replica assignment
{version:1,partitions:[{topic:first,partition:0,replicas:[0,2,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[2,1,0],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,0,2],log_dirs:[any,any,any]}]}Proposed partition reassignment configuration
{version:1,partitions:[{topic:first,partition:0,replicas:[2,3,0],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,0,1],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,1,2],log_dirs:[any,any,any]}]}3创建副本存储计划所有副本存储在 broker0、broker1、broker2、broker3 中。
[zhaohadoop102 kafka]$ vim increase-replication-factor.json{version:1,partitions:[{topic:first,partition:0,replicas:[2,3,0],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,0,1],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,1,2],log_dirs:[any,any,any]}]}4执行副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5验证副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verifyStatus of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first4.2.2 退役旧节点
1执行负载均衡操作
先按照退役一台节点生成执行计划然后按照服役时操作流程执行负载均衡。
1创建一个要均衡的主题。
[zhaohadoop102 kafka]$ vim topics-to-move.json
{topics: [{topic: first}],version: 1
}2创建执行计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2 --generateCurrent partition replica assignment
{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,2,3],log_dirs:[any,any,any]}]}Proposed partition reassignment configuration
{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[0,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,2,0],log_dirs:[any,any,any]}]}3创建副本存储计划所有副本存储在 broker0、broker1、broker2 中。
[zhaohadoop102 kafka]$ vim increase-replication-factor.json{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[0,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,2,0],log_dirs:[any,any,any]}]}4执行副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5验证副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verifyStatus of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first2执行停止命令
在 hadoop105 上执行停止命令即可。
[zhaohadoop105 kafka]$ bin/kafka-server-stop.sh复习
一、概述1、定义1传统定义分布式 发布订阅 消息队列发布订阅分为多种类型 订阅者根据需求 选择性订阅2最新定义流平台存储、计算2、消息队列应用场景1缓存消峰 2解耦3异步通信3、两种模式1点对点1一个生产者 一个消费者 一个topic 会删除数据 不多2发布订阅1多个生产者 消费者多个 而且相互独立 多个topic 不会删除数据4、架构1生产者100T数据2broker 1broker 服务器 hadoop102 103 1042topic 主题 对数据分类3分区4可靠性 副本5leader follower 6生产者和消费者 只针对leader操作3消费者1消费者和消费者相互独立2消费者组 某个分区 只能由一个消费者消费4zookeeper(一般不使用kafka自带的Zookeeper使用外部Zookeeper)1broker.ids 0 1 2 2leader isr
二、入门1、安装1broker.id 必须全局唯一2broker.id、log.dirs zk/kafka3启动停止 先停止kafka 再停zk4脚本#!/bin/bashcase $1 instart)for i in hadoop102 hadoop103 hadoop104dossh $i 绝对路径done;;stop);;esac2、常用命令行1主题 kafka-topic.sh 1--bootstrap-server hadoop102:9092,hadoop103:90922--topic first3--create 4--delete5--alter6--list7--describe8--partitions9--replication-factor 2生产者 kafka-console-producer.sh 1--bootstrap-server hadoop102:9092,hadoop103:90922--topic first3消费者 kafka-console-consumer.sh 1--bootstrap-server hadoop102:9092,hadoop103:90922--topic first三、生产者 1、原理2、异步发送API0配置1连接 bootstrap-server 2key value序列化1创建生产者KafkaProducerString,String()2发送数据send() send(,new Callback)3关闭资源3、同步发送。。。send() send(,new Callback).get()。。。4、分区 1好处存储计算2默认分区规则1指定分区 按分区走2key key的hashcode值%分区数3没有指定key 没有指定分区 粘性第一随机3自定义分区定义类 实现partitioner接口 5、吞吐量提高1批次大小 16k 32k2linger.ms 0 5-100ms3压缩 4缓存大小 32m 64m 6、可靠性 acks 0 丢失数据1 也可能会丢 传输普通日志-1 完全可靠 副本大于等于2 isr 2 数据重复7、数据重复1幂等性pid, 分区号序列号默认打开2事务底层基于幂等性1初始化2启动3消费者offset4提交5终止8、数据有序单分区内有序有条件多分区有序怎么办9、乱序1inflight 1 2没有幂等性 inflight 1 3有幂等性
四、broker 1、zk存储了哪些信息1broker.ids2leader 3辅助选举 controller 2、工作流程3、服役1准备一台干净服务器 hadoop1002对哪个主题操作 3形成计划4执行计划5验证计划4、退役1要退役的节点不让存储数据2退出节点4.3 Kafka 副本
4.3.1 副本基本信息
副本数是1 意思是就一个分区同时也是主分区
副本数是2意思是有2个分区1个是主分区1个是从分区
副本数已把主分区数包含在内
1Kafka 副本作用提高数据可靠性。
2Kafka 默认副本 1 个生产环境一般配置为 2 个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率。
3Kafka 中副本分为Leader 和 Follower。Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 进行同步数据。
4Kafka 分区中的所有副本统称为 ARAssigned Repllicas。
AR ISR OSR
ISR表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送
通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由replica.lag.time.max.ms参数设定默认 30s。Leader 发生故障之后就会从 ISR 中选举新的 Leader。
OSR表示 Follower 与 Leader 副本同步时延迟过多的副本。
4.3.2 Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader负责管理集群broker 的上下线所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。 1创建一个新的 topic4 个分区4 个副本
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic zhao1 --partitions 4 --replication-factor 4Created topic zhao1.2查看 Leader 分布情况
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe--topic zhao1
Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes1073741824Topic: zhao1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1Topic: zhao1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2Topic: zhao1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,33停止掉 hadoop105 的 kafka 进程并查看 Leader 分区情况
[zhaohadoop105 kafka]$ bin/kafka-server-stop.sh[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic zhao1
Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor:4Configs: segment.bytes1073741824
Topic: zhao1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: zhao1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: zhao1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,04停止掉 hadoop104 的 kafka 进程并查看 Leader 分区情况
[zhaohadoop104 kafka]$ bin/kafka-server-stop.sh[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic zhao1Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes1073741824Topic: zhao1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1Topic: zhao1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1Topic: zhao1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,05启动 hadoop105 的 kafka 进程并查看 Leader 分区情况
[zhaohadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic zhao1
Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes1073741824Topic: zhao1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3Topic: zhao1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3Topic: zhao1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,36启动 hadoop104 的 kafka 进程并查看 Leader 分区情况
[zhaohadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic zhao1
Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes1073741824Topic: zhao1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2Topic: zhao1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2Topic: zhao1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,27停止掉 hadoop103 的 kafka 进程并查看 Leader 分区情况
[zhaohadoop103 kafka]$ bin/kafka-server-stop.sh[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic zhao1
Topic: zhao1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes1073741824Topic: zhao1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2Topic: zhao1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2Topic: zhao1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2Topic: zhao1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,24.3.3 Leader 和 Follower 故障处理细节
Follower故障处理细节 Leader故障处理细节 4.3.4 分区副本分配
如果 kafka 服务器只有 4 个节点那么设置 kafka 的分区数大于服务器台数在 kafka底层如何分配存储副本呢
1创建 16 分区3 个副本
1创建一个新的 topic名称为 second。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second2查看分区和副本情况。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic secondTopic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1分区副本分配 4.3.5 生产经验——手动调整分区副本存储
生产经验——手动调整分区副本存储
在生产环境中每台服务器的配置和性能不一致但是Kafka只会根据自己的代码规则创建对应的分区副本就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。 手动调整分区副本存储的步骤如下
1创建一个新的 topic名称为 three。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three2查看分区副本存储情况。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three3创建副本存储计划所有副本都指定存储在 broker0、broker1 中。
[zhaohadoop102 kafka]$ vim increase-replication-factor.json输入如下内容
{version:1,partitions:[{topic:three,partition:0,replicas:[0,1]}, {topic:three,partition:1,replicas:[0,1]}, {topic:three,partition:2,replicas:[1,0]}, {topic:three,partition:3,replicas:[1,0]}]
}4执行副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5验证副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify6查看分区副本存储情况。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three4.3.6 生产经验——Leader Partition 负载平衡
Leader Partition自动平衡 参数名称描述auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。生产环境中leader 重选举的代价比较大可能会带来性能影响建议设置为 false 关闭。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
4.3.7 生产经验——增加副本因子
在生产环境当中由于某个主题的重要等级需要提升我们考虑增加副本。副本数的增加需要先制定计划然后根据计划执行。
1创建 topic
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four2手动增加副本存储
1创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。
[zhaohadoop102 kafka]$ vim increase-replication-factor.json输入如下内容
{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]}]}2执行副本存储计划。
[zhaohadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute4.4 文件存储
4.4.1 文件存储机制
1Topic 数据的存储机制
Kafka文件存储机制 2思考Topic 数据到底存储在什么位置
1启动生产者并发送消息。
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello world2查看 hadoop102或者 hadoop103、hadoop104的/opt/module/kafka/datas/first-1first-0、first-2路径上的文件。
[zhaohadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata3直接查看 log 日志发现是乱码。
[zhaohadoop104 first-1]$ cat 00000000000000000092.log
\CYnF|©|©ÿÿÿÿÿÿÿÿÿÿÿÿÿÿhello world4通过工具查看 index 和 log 信息。
[zhaohadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.indexDumping ./00000000000000000000.index
offset: 3 position: 152[zhaohadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.logDumping datas/first-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid: true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid: truebaseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid: true
baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid: true baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid: true3index 文件和 log 文件详解
Log文件和Index文件详解 说明日志存储参数配置
参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。 稀疏索引。
4.4.2 文件清理策略
Kafka 中默认的日志保存时间为 7 天可以通过调整如下参数修改保存时间。
log.retention.hours最低优先级小时默认 7 天。log.retention.minutes分钟。log.retention.ms最高优先级毫秒。log.retention.check.interval.ms负责设置检查周期默认 5 分钟。
那么日志一旦超过了设置的时间怎么处理呢
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
1delete 日志删除将过期数据删除
log.cleanup.policy delete 所有数据启用删除策略
1基于时间默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
2基于大小默认关闭。超过设置的所有日志总大小删除最早的 segment。
log.retention.bytes默认等于-1表示无穷大。
**思考**如果一个 segment 中有一部分数据过期一部分没有过期怎么处理 2compact 日志压缩 4.5 高效读写数据
1Kafka 本身是分布式集群可以采用分区技术并行度高
2读数据采用稀疏索引可以快速定位要消费的数据
3顺序写磁盘
Kafka 的 producer 生产数据要写入到 log 文件中写的过程是一直追加到文件末端为顺序写。官网有数据表明同样的磁盘顺序写能到 600M/s而随机写只有 100K/s。这与磁盘的机械机构有关顺序写之所以快是因为其省去了大量磁头寻址的时间。 4页缓存 零拷贝技术 参数描述log.flush.interval.messages强制页缓存刷写到磁盘的条数默认是 long 的最大值9223372036854775807。一般不建议修改交给系统自己管理。log.flush.interval.ms每隔多久刷数据到磁盘默认是 null。一般不建议修改交给系统自己管理。
第 5 章 Kafka 消费者
5.1 Kafka 消费方式 5.2 Kafka 消费者工作流程
5.2.1 消费者总体工作流程 5.2.2 消费者组原理
消费者组 消费者组初始化流程 消费者组详细消费流程 5.2.3 消费者重要参数
参数名称描述bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在如数据被删除了该如何处理 earliest自动重置偏移量到最早的偏移量。 latest默认自动重置偏移量为最新的偏移量。 none如果消费组原来的previous偏移量不存在则向消费者抛异常。 anything向消费者抛异常。offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。fetch.max.bytes默认Default: 5242880050 m。消费者 获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes brokerconfigor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。
5.3 消费者 API
5.3.1 独立消费者案例订阅主题
1需求 创建一个独立消费者消费 first 主题中数据。 注意在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id。
2实现步骤
1创建包名com.zhao.kafka.consumer 2编写代码
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组组名任意起名 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 创建消费者对象KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 注册要消费的主题可以消费多个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecordString, String consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}3测试
1在 IDEA 中执行消费者程序。
2在 Kafka 集群控制台创建 Kafka 生产者并输入数据。
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first hello3在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic first, partition 1, leaderEpoch 3,offset 0, CreateTime 1629160841112, serialized key size -1,serialized value size 5, headers RecordHeaders(headers [],isReadOnly false), key null, value hello)5.3.2 独立消费者案例订阅分区
1需求创建一个独立消费者消费 first 主题 0 号分区的数据。 2实现步骤
1代码编写。
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerPartition {public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组必须名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,test);KafkaConsumerString,String kafkaConsumer new KafkaConsumer(properties);// 消费某个主题的某个分区数据ArrayListTopicPartition topicPartitions new ArrayList();topicPartitions.add(new TopicPartition(first, 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}3测试
1在 IDEA 中执行消费者程序。
2在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号
分区的数据。
first 0 381
first 0 382
first 2 168
first 1 165
first 1 1663在 IDEA 控制台观察接收到的数据只能消费到 0 号分区数据表示正确。
ConsumerRecord(topic first, partition 0, leaderEpoch 14,offset 381, CreateTime 1636791331386, serialized key size -1, serialized value size 9, headers RecordHeaders(headers [], isReadOnly false), key null, value zhao 0)ConsumerRecord(topic first, partition 0, leaderEpoch 14,offset 382, CreateTime 1636791331397, serialized key size -1, serialized value size 9, headers RecordHeaders(headers [], isReadOnly false), key null, value zhao 1)5.3.3 消费者组案例
1需求测试同一个主题的分区数据只能由一个消费者组中的一个消费。 2案例实操
1复制一份基础消费者的代码在 IDEA 中同时启动即可启动同一个消费者组中的两个消费者。
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer1 {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 创建消费者对象KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 注册主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s中消费一批数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}2启动代码中的生产者发送消息在 IDEA 控制台即可看到两个消费者在消费不同分区的数据如果只发生到一个分区可以在发送时增加延迟代码 Thread.sleep(2);。
ConsumerRecord(topic first, partition 0, leaderEpoch 2,offset 3, CreateTime 1629169606820, serialized key size -1,serialized value size 8, headers RecordHeaders(headers [],isReadOnly false), key null, value hello1)ConsumerRecord(topic first, partition 1, leaderEpoch 3,offset 2, CreateTime 1629169609524, serialized key size -1,serialized value size 6, headers RecordHeaders(headers [],isReadOnly false), key null, value hello2)ConsumerRecord(topic first, partition 2, leaderEpoch 3,offset 21, CreateTime 1629169611884, serialized key size -1,serialized value size 6, headers RecordHeaders(headers [],isReadOnly false), key null, value hello3)3重新发送到一个全新的主题中由于默认创建的主题分区数为 1可以看到只能有一个消费者消费到数据。 5.4 生产经验——分区的分配以及再平衡 参数名称描述heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条 目的值必 须小于 session.timeout.ms也不 应该高于session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。partition.assignment.strategy消 费 者 分 区 分 配 策 略 默 认 策 略 是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 Range 、 RoundRobin 、 Sticky 、CooperativeSticky
5.4.1 Range 以及再平衡
1Range 分区策略原理 2Range 分区分配策略案例
1修改主题 first 为 7 个分区。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7注意分区数可以增加但是不能减少。
2复制 CustomConsumer 类创建 CustomConsumer2。这样可以由三个消费CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组组名都为“test”同时启动 3 个消费者。 3启动 CustomProducer 生产者发送 500 条消息随机发送到不同的分区。
package com.zhao.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {public static void main(String[] args) throws InterruptedException {Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaProducerString, String kafkaProducer new KafkaProducer(properties);for (int i 0; i 7; i) {kafkaProducer.send(new ProducerRecord(first, i,test, zhao));}kafkaProducer.close();}
}说明Kafka 默认的分区分配策略就是 Range CooperativeSticky所以不需要修改策略。
4观看 3 个消费者分别消费哪些分区的数据。 3Range 分区分配再平衡案例
1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。
1 号消费者消费到 3、4 号分区数据。 2 号消费者消费到 5、6 号分区数据。 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。
2再次重新发送消息观看结果45s 以后。
1 号消费者消费到 0、1、2、3 号分区数据。 2 号消费者消费到 4、5、6 号分区数据。
说明消费者 0 已经被踢出消费者组所以重新按照 range 方式分配。
5.4.2 RoundRobin 以及再平衡
1RoundRobin 分区策略原理 2RoundRobin 分区分配策略案例
1依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RoundRobinAssignor);2重启 3 个消费者重复发送消息的步骤观看分区结果。 3RoundRobin 分区分配再平衡案例
1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。
1 号消费者消费到 2、5 号分区数据 2 号消费者消费到 4、1 号分区数据 0 号消费者的任务会按照 RoundRobin 的方式把数据轮询分成 0 、6 和 3 号分区数据 分别由 1 号消费者或者 2 号消费者消费。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需 要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行
2再次重新发送消息观看结果45s 以后。
1 号消费者消费到 0、2、4、6 号分区数据 2 号消费者消费到 1、3、5 号分区数据
说明消费者 0 已经被踢出消费者组所以重新按照 RoundRobin 方式分配。
5.4.3 Sticky 以及再平衡
粘性分区定义可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前考虑上一次分配的结果尽量少的调整分配的变动可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略首先会尽量均衡的放置分区到消费者上面在出现同一消费者组内消费者出现问题的时候会尽量保持原有分配的分区不变化
1需求
设置主题为 first7 个分区准备 3 个消费者采用粘性分区策略并进行消费观察
消费分配情况。然后再停止其中一个消费者再次观察消费分配情况。
2步骤
1修改分区分配策略为粘性。
注意3 个消费者都应该注释掉之后重启 3 个消费者如果出现报错全部停止等
会再重启或者修改为全新的消费者组。
// 修改分区分配策略
ArrayListString startegys new ArrayList();
startegys.add(org.apache.kafka.clients.consumer.StickyAssignor);properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);2使用同样的生产者发送 500 条消息。
可以看到会尽量保持分区的个数近似划分分区。 3Sticky 分区分配再平衡案例
1停止掉 0 号消费者快速重新发送消息观看结果45s 以内越快越好。
1 号消费者消费到 2、5、3 号分区数据。 2 号消费者消费到 4、6 号分区数据。 0 号消费者的任务会按照粘性规则尽可能均衡的随机分成 0 和 1 号分区数据分别 由 1 号消费者或者 2 号消费者消费。
说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需 要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。
2再次重新发送消息观看结果45s 以后。
1 号消费者消费到 2、3、5 号分区数据。 2 号消费者消费到 0、1、4、6 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照粘性方式分配。
5.5 offset 位移
5.5.1 offset 的默认维护位置 __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.idtopic分区号value 就是当前 offset 的值。每隔一段时间kafka 内部会对这个 topic 进行compact也就是每个 group.idtopic分区号就保留最新数据。
1消费 offset 案例
0思想__consumer_offsets 为 Kafka 中的 topic那就可以通过消费者进行消费。
1在配置文件 config/consumer.properties 中添加配置 exclude.internal.topicsfalse默认是 true表示不能消费系统主题。为了查看该系统主题数据所以该参数修改为 false。
2采用命令行方式创建一个新的 topic。
[zhaohadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic zhao --partitions 2 --replication-factor 23启动生产者往 zhao 生产数据。
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh --topic zhao --bootstrap-server hadoop102:90924启动消费者消费 zhao 数据。
[zhaohadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic zhao --group test注意指定消费者组名称更好观察数据存储位置key 是 group.idtopic分区号。
5查看消费者消费主题__consumer_offsets。
[zhaohadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter --from-beginning[offset,zhao,1]::OffsetAndMetadata(offset7,
leaderEpochOptional[0], metadata, commitTimestamp1622442520203,
expireTimestampNone)
[offset,zhao,0]::OffsetAndMetadata(offset8,
leaderEpochOptional[0], metadata, commitTimestamp1622442520203,
expireTimestampNone)5.5.2 自动提交 offset 参数名称描述enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。
1消费者自动提交 offset
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerAutoOffset {public static void main(String[] args) {// 1. 创建 kafka消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交 offset的时间周期 1000ms默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);//3. 创建 kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList(first));//5. 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}5.5.3 手动提交 offset 1同步提交 offset
由于同步提交 offset 有失败重试机制故更加可靠但是由于一直等待提交结果提交的效率比较低。以下为同步提交 offset 的示例。
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {public static void main(String[] args) {// 1. 创建 kafka消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList(first));//5. 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}// 同步提交 offsetconsumer.commitSync();}}
}2异步提交 offset
虽然同步提交 offset 更可靠一些但是由于其会阻塞当前线程直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下会选用异步提交 offset 的方式。
以下为异步提交 offset 的示例
ackage com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomConsumerByHandAsync {public static void main(String[] args) {// 1. 创建 kafka消费者配置类Properties properties new Properties();// 2. 添加配置参数// 添加连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);// 配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//3. 创建 Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//4. 设置消费主题 形参是列表consumer.subscribe(Arrays.asList(first));//5. 消费数据while (true){// 读取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(1));// 输出消息for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}// 异步提交 offsetconsumer.commitAsync();}}
}5.5.4 指定 Offset 消费
auto.offset.reset earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量消费者组第一次消费或服务器上不再存在当前偏移量
时例如该数据已被删除该怎么办
1earliest自动将偏移量重置为最早的偏移量–from-beginning。
2latest默认值自动将偏移量重置为最新偏移量。
3none如果未找到消费者组的先前偏移量则向消费者抛出异常。 4任意指定 offset 位移开始消费
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {public static void main(String[] args) {// 0 配置信息Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key value反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 1 创建一个消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}// 遍历所有分区并指定 offset从 1700的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 1700);}// 3 消费该主题数据while (true) {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}注意每次执行完需要修改消费者组名
5.5.5 指定时间消费
需求在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。
例如要求按照时间消费前一天的数据怎么处理
操作步骤
package com.zhao.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerForTime {public static void main(String[] args) {// 0 配置信息Properties properties new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hadoop102:9092);// key value反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 1 创建一个消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅一个主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);SetTopicPartition assignment new HashSet();while (assignment.size() 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息有了分区分配信息才能开始消费assignment kafkaConsumer.assignment();}HashMapTopicPartition, Long timestampToSearch new HashMap();// 封装集合存储每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);}// 获取从 1天前开始消费的每个分区的 offsetMapTopicPartition, OffsetAndTimestamp offsets kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp ! null){kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}5.5.6 漏消费和重复消费
重复消费已经消费了数据但是 offset 没提交。
漏消费先提交 offset 后消费有可能会造成数据的漏消费。 5.6 生产经验——消费者事务 5.7 生产经验——数据积压消费者如何提高吞吐量 参数名称描述fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条
第 6 章 Kafka-Eagle 监控
Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况在生产环境中经常使用。
6.1 MySQL 环境准备
Kafka-Eagle 的安装依赖于 MySQLMySQL 主要用来存储可视化展示的数据。如果集群中之前安装过 MySQL 可以跨过该步。
6.2 Kafka 环境准备
1关闭 Kafka 集群
[zhaohadoop102 kafka]$ kf.sh stop2修改**/opt/module/kafka/bin/kafka-server-start.sh** 命令中
[zhaohadoop102 kafka]$ vim bin/kafka-server-start.sh修改如下参数值
if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx1G -Xms1G
fi为
if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-server -Xms2G -Xmx2G -XX:PermSize128m -XX:UseG1GC -XX:MaxGCPauseMillis200 -XX:ParallelGCThreads8 -XX:ConcGCThreads5 -XX:InitiatingHeapOccupancyPercent70export JMX_PORT9999#export KAFKA_HEAP_OPTS-Xmx1G -Xms1G
fi注意修改之后在启动 Kafka 之前要分发之其他节点
[zhaohadoop102 bin]$ xsync kafka-server-start.sh6.3 Kafka-Eagle 安装
0官网https://www.kafka-eagle.org/
1上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群/opt/software 目录
2解压到本地
[zhaohadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz3进入刚才解压的目录
[zhaohadoop102 kafka-eagle-bin-2.0.8]$ ll
总用量 79164
-rw-rw-r--. 1 zhao zhao 81062577 10 月 13 00:00 efak-web-2.0.8-bin.tar.gz4将 efak-web-2.0.8-bin.tar.gz 解压至**/opt/module**
[zhaohadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/5修改名称
[zhaohadoop102 module]$ mv efak-web-2.0.8/ efak6修改配置文件 /opt/module/efak/conf/system-config.properties
[zhaohadoop102 conf]$ vim system-config.properties######################################
# multi zookeeper kafka cluster list
#Settings prefixed with kafka.eagle. will be deprecated, use efak.
instead
######################################
efak.zk.cluster.aliascluster1
cluster1.zk.listhadoop102:2181,hadoop103:2181,hadoop104:2181/kafka######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enablefalse
cluster1.zk.acl.schemadigest
cluster1.zk.acl.usernametest
cluster1.zk.acl.passwordtest123######################################
# broker size online list
######################################
cluster1.efak.broker.size20######################################
# zk client thread limit
######################################
kafka.zk.limit.size32
######################################
# EFAK webui port
######################################
efak.webui.port8048######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.aclfalse
cluster1.efak.jmx.userkeadmin
cluster1.efak.jmx.passwordkeadmin123
cluster1.efak.jmx.sslfalse
cluster1.efak.jmx.truststore.location/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.passwordke123456######################################
# kafka offset storage
######################################
# offset保存在 kafka
cluster1.efak.offset.storagekafka######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uriservice:jmx:rmi:///jndi/rmi://%s/jmxrmi######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.chartstrue
efak.metrics.retain15######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max5000
efak.sql.topic.preview.records.max10######################################
# delete kafka topic token
######################################
efak.topic.tokenkeadmin######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enablefalse
cluster1.efak.sasl.protocolSASL_PLAINTEXT
cluster1.efak.sasl.mechanismSCRAM-SHA-256
cluster1.efak.sasl.jaas.configorg.apache.kafka.common.security.scram.ScramL
oginModule required usernamekafka passwordkafka-eagle;
cluster1.efak.sasl.client.id
cluster1.efak.blacklist.topics
cluster1.efak.sasl.cgroup.enablefalse
cluster1.efak.sasl.cgroup.topics
cluster2.efak.sasl.enablefalse
cluster2.efak.sasl.protocolSASL_PLAINTEXT
cluster2.efak.sasl.mechanismPLAIN
cluster2.efak.sasl.jaas.configorg.apache.kafka.common.security.plain.PlainL
oginModule required usernamekafka passwordkafka-eagle;
cluster2.efak.sasl.client.id
cluster2.efak.blacklist.topics
cluster2.efak.sasl.cgroup.enablefalse
cluster2.efak.sasl.cgroup.topics######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enablefalse
cluster3.efak.ssl.protocolSSL
cluster3.efak.ssl.truststore.location
cluster3.efak.ssl.truststore.password
cluster3.efak.ssl.keystore.location
cluster3.efak.ssl.keystore.password
cluster3.efak.ssl.key.password
cluster3.efak.ssl.endpoint.identification.algorithmhttps
cluster3.efak.blacklist.topics
cluster3.efak.ssl.cgroup.enablefalse
cluster3.efak.ssl.cgroup.topics######################################
# kafka sqlite jdbc driver address######################################
# 配置 mysql连接
efak.drivercom.mysql.jdbc.Driver
efak.urljdbc:mysql://hadoop102:3306/ke?useUnicodetruecharacterEncodingUT
F-8zeroDateTimeBehaviorconvertToNull
efak.usernameroot
efak.password000000######################################
# kafka mysql jdbc driver address
######################################
#efak.drivercom.mysql.cj.jdbc.Driver
#efak.urljdbc:mysql://127.0.0.1:3306/ke?useUnicodetruecharacterEncodingU
TF-8zeroDateTimeBehaviorconvertToNull
#efak.usernameroot
#efak.password1234567添加环境变量
[zhaohadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh# kafkaEFAK
export KE_HOME/opt/module/efak
export PATH$PATH:$KE_HOME/bin注意source /etc/profile
[zhaohadoop102 conf]$ source /etc/profile8启动
1注意启动之前需要先启动 ZK 以及 KAFKA。
[zhaohadoop102 kafka]$ kf.sh start2启动 efak
[zhaohadoop102 efak]$ bin/ke.sh start
Version 2.0.8 -- Copyright 2016-2021
*****************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit http://192.168.10.102:8048
* Account:admin ,Password:123456
*****************************************************************
* Usage ke.sh [start|status|stop|restart|stats] /Usage
* Usage https://www.kafka-eagle.org/ /Usage
*****************************************************************说明如果停止 efak执行命令
[zhaohadoop102 efak]$ bin/ke.sh stop6.4 Kafka-Eagle 页面操作
1登录页面查看监控数据
http://192.168.10.102:8048/ 第 7 章 Kafka-Kraft 模式
7.1 Kafka-Kraft 架构 左图为 Kafka 现有架构元数据在 zookeeper 中运行时动态选举 controller由controller 进行 Kafka 集群管理。右图为 kraft 模式架构实验性不再依赖 zookeeper 集群而是用三台 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个
Kafka 不再依赖外部框架而是能够独立运行controller 管理集群时不再需要从 zookeeper 中先读取数据集群性能上升由于不依赖 zookeeper集群扩展时不再受到 zookeeper 读写能力限制controller 不再动态选举而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置而不是像以前一样对随机 controller 节点的高负载束手无策。
7.2 Kafka-Kraft 集群部署
1再次解压一份 kafka 安装包
[zhaohadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/2重命名为 kafka2
[zhaohadoop102 module]$ mv kafka_2.12-3.0.0/ kafka23在 hadoop102 上修改/opt/module/kafka2/config/kraft/server.properties 配置文件
[zhaohadoop102 kraft]$ vim server.properties#kafka 的角色controller 相当于主机、broker 节点相当于从机主机类似 zk 功
能
process.rolesbroker, controller
#节点 ID
node.id2
#controller服务协议别名
controller.listener.namesCONTROLLER
#全 Controller列表
controller.quorum.voters2hadoop102:9093,3hadoop103:9093,4hado
op104:9093
#不同服务器绑定的端口
listenersPLAINTEXT://:9092,CONTROLLER://:9093
#broker服务协议别名
inter.broker.listener.namePLAINTEXT
#broker对外暴露的地址
advertised.ListenersPLAINTEXT://hadoop102:9092
#协议别名到安全协议的映射
listener.security.protocol.mapCONTROLLER:PLAINTEXT,PLAINTEXT:PLA
INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka数据存储目录
log.dirs/opt/module/kafka2/data4分发 kafka2
[zhaohadoop102 module]$ xsync kafka2/在 hadoop103 和 hadoop104 上 需 要 对 node.id 相 应 改 变 值 需 要 和controller.quorum.voters 对应。在 hadoop103 和 hadoop104 上 需 要 根 据 各 自 的 主 机 名 称 修 改 相 应 的advertised.Listeners 地址。
5初始化集群数据目录
1首先生成存储目录唯一 ID。
[zhaohadoop102 kafka2]$ bin/kafka-storage.sh random-uuid J7s9e8PPTKOO47PxzI39VA2用该 ID 格式化 kafka 存储目录三台节点。
[zhaohadoop102 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties[zhaohadoop103 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties[zhaohadoop104 kafka2]$ bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties6启动 kafka 集群
[zhaohadoop102 kafka2]$ bin/kafka-server-start.sh -daemon
config/kraft/server.properties[zhaohadoop103 kafka2]$ bin/kafka-server-start.sh -daemon
config/kraft/server.properties[zhaohadoop104 kafka2]$ bin/kafka-server-start.sh -daemon
config/kraft/server.properties7停止 kafka 集群
[zhaohadoop102 kafka2]$ bin/kafka-server-stop.sh
[zhaohadoop103 kafka2]$ bin/kafka-server-stop.sh
[zhaohadoop104 kafka2]$ bin/kafka-server-stop.sh7.3 Kafka-Kraft 集群启动停止脚本
1在/home/zhao/bin 目录下创建文件 kf2.sh 脚本文件
[zhaohadoop102 bin]$ vim kf2.sh脚本如下
#! /bin/bash
case $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-stop.sh done
};;
esac2添加执行权限
[zhaohadoop102 bin]$ chmod x kf2.sh3启动集群命令
[zhaohadoop102 ~]$ kf2.sh start4停止集群命令
[zhaohadoop102 ~]$ kf2.sh stop第 8 章 集成SpringBoot
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者也可以用于 SpringBoot 的消费者。 1在 IDEA 中安装 lombok 插件
在 Plugins 下搜索 lombok 然后在线安装即可安装后注意重启 2SpringBoot 环境准备
1创建一个 Spring Initializr 注意有时候 SpringBoot 官方脚手架不稳定我们切换国内地址 https://start.aliyun.com
2项目名称 sringboot 3添加项目依赖 4检查自动生成的配置文件
?xml version1.0 encodingUTF-8?
projectxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.1/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.zhao/groupIdartifactIdspringboot/artifactIdversion0.0.1-SNAPSHOT/versionnamespringboot/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build
/project8.1 SpringBoot 生产者
1修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息
# 应用名称
spring.application.namezhao_springboot_kafka# 指定 kafka的地址
spring.kafka.bootstrap-
servershadoop102:9092,hadoop103:9092,hadoop104:9092#指定 key和 value的序列化器
spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer2创建 controller 从浏览器接收数据, 并写入指定的 topic
package com.zhao.springboot;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
RestController
public class ProducerController {// Kafka模板用来向 kafka发送数据AutowiredKafkaTemplateString, String kafka;RequestMapping(/zhao)public String data(String msg) {kafka.send(first, msg);return ok;}
}3在浏览器中给/zhao 接口发送数据
http://localhost:8080/zhao?msghello8.2 SpringBoot 消费者
1修改 SpringBoot 核心配置文件 application.propeties
# 消费者配置开始
# 指定 kafka的地址
spring.kafka.bootstrap-
servershadoop102:9092,hadoop103:9092,hadoop104:9092# 指定 key和 value的反序列化器
spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer#指定消费者组的 group_id
spring.kafka.consumer.group-idzhao
# 消费者配置结束2创建类消费 Kafka 中指定 topic 的数据
package com.zhao.springboot;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
Configuration
public class KafkaConsumer {// 指定要监听的 topicKafkaListener(topics first)public void consumeTopic(String msg) { // 参数: 收到的 valueSystem.out.println(收到的信息: msg);}
}3向 first 主题发送数据
[zhaohadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first