用视频做网站背景,网站的链接建设,wordpress好用的地图,厦门关键词排名seolinux安装zookeeper和kafka集群 一、Zookeeper集群部署安装zookeeper1. 下载2. 上传, 解压3. 配置 Zookeeper 节点4. 创建 myid 文件5. 启动参数更改6. sh文件授权7. 启动集群8. 防火墙开启端口 验证集群 二、kafka集群安装安装Kafka1. 下载Kafka安装包2. 上传到服务器#xf… linux安装zookeeper和kafka集群 一、Zookeeper集群部署安装zookeeper1. 下载2. 上传, 解压3. 配置 Zookeeper 节点4. 创建 myid 文件5. 启动参数更改6. sh文件授权7. 启动集群8. 防火墙开启端口 验证集群 二、kafka集群安装安装Kafka1. 下载Kafka安装包2. 上传到服务器并解压到指定目录。3. 创建目录用于存放数据4. 修改配置文件5. 启动参数更改6. sh文件授权7. 启动集群8. 防火墙开启端口9. 创建主题10. 验证主题配置11. 查看主题列表12. 删除指定主题 安装kafka可视化管理工具(可选)1. 上传并解压。2. 修改配置文件3. 启动kafka-ui-lite4.日志输出5. 防火墙开启端口 测试1. 进入kafka管理页面2. 添加配置3. 生产、消费消息4. 模拟节点宕机 一、Zookeeper集群部署
安装zookeeper
1. 下载
检查jdk版本, Zookeeper 依赖于 Java8 以上环境
java -version下载zk, 我这里下载的是3.6.4
https://archive.apache.org/dist/zookeeper/2. 上传, 解压
tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz -C /opt/app/apache-zookeeper-3.6.43. 配置 Zookeeper 节点
复制zoo_sample.cfg为zoo.cfg, 然后编辑
# tickTime 是一个核心配置参数定义了 Zookeeper 的基本时间单位。
# 它用于控制和影响其他一些时间相关的配置例如会话超时、Leader 与 Follower 之间的心跳检测等。
tickTime2000# initLimit 指定了 Follower 从服务器在启动时与 Leader 服务器进行数据同步的最大时间。
# 如果从服务器在规定的 initLimit 时间内未完成与 Leader 的同步则 Zookeeper 会认为该从服务器无法正常加入集群。
# 如果 tickTime2000 毫秒2 秒那么 initLimit10 就表示允许的初始化同步时间为 10 * 2000 20000 毫秒即 20 秒。
initLimit10# syncLimit 指定了 Leader 和 Follower 之间心跳消息的超时时间。
# 如果一个 Follower 在规定的 syncLimit 时间内无法收到 Leader 的消息
# Zookeeper 会认为该 Follower 已失效并从集群中剔除。
syncLimit5# 在 Zookeeper 中快照是指当前节点数据的备份。
# Zookeeper 会定期将内存中的数据状态保存为一个快照文件存储在指定的目录中
# 以便在服务器重启或崩溃时可以从快照恢复数据。
dataDir/opt/app/apache-zookeeper-3.6.4/data# 客户端连接的地址
clientPort8111# 每个客户端根据客户端的 IP 地址区分对单个 Zookeeper 服务器的最大连接数。
maxClientCnxns60# 用于自动清理旧数据的配置参数。它控制 Zookeeper 定期清理旧的快照snapshot文件和事务日志的频率
# 默认情况下autopurge.purgeInterval 为 0表示不启用自动清理功能。在这里先用默认的。
autopurge.purgeInterval0# 集群节点信息
# 8112 端口用于 Follower 和 Leader 之间的通信。
# 8113 端口用于 Leader 选举。
server.1192.168.0.170:8112:8113
server.2192.168.0.171:8112:8113
server.3192.168.0.149:8112:8113# 监控工具
# https://prometheus.io Metrics Exporter
#metricsProvider.classNameorg.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort7000
#metricsProvider.exportJvmInfotrue4. 创建 myid 文件
在每台机器上的zookeeper数据目录下创建myid, 然后执行以下命令
echo 1 /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.170 机器上, 和zoo.cfg中的server.1对应echo 2 /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.171 机器上, 和zoo.cfg中的server.2对应echo 3 /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.149 机器上, 和zoo.cfg中的server.3对应5. 启动参数更改
进入bin目录, 更改zkEnv.sh中的启动参数
vim /opt/app/apache-zookeeper-3.6.4/bin/zkEnv.sh更改SERVER_JVMFLAGS, 设置jvm初始化内存与最大堆内存
export SERVER_JVMFLAGS-Xms4g -Xmx4g6. sh文件授权
进入bin目录
cd /opt/app/apache-zookeeper-3.6.4/bin授予.sh执行权限
chmod 755 *.sh7. 启动集群
在每台机器上启动zk
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh start然后检查每台机器zk状态
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh status停止命令如下
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh stop8. 防火墙开启端口
开启端口
firewall-cmd --add-port8111/tcp --permanent
firewall-cmd --add-port8112/tcp --permanent
firewall-cmd --add-port8113/tcp --permanent重启防火墙
firewall-cmd --reload验证集群
使用 Zookeeper 客户端连接
/opt/app/apache-zookeeper-3.6.4/bin/zkCli.sh -server ip:port二、kafka集群安装
安装Kafka
1. 下载Kafka安装包
下载地址https://kafka.apache.org/downloads
如果只是使用的话下载二进制文件就行不用选择source,在这里我选择下载kafka_2.13-3.5.2.tgzscala版本为2.13kafka版本为3.5.2。
2. 上传到服务器并解压到指定目录。
tar -zxvf kafka_2.13-3.5.2.tgz -C /opt/app/kafka_2.13-3.5.23. 创建目录用于存放数据
mkdir -p /opt/app/kafka_2.13-3.5.2/data4. 修改配置文件
修改kafka的配置文件修改如下
vim /opt/app/kafka_2.13-3.5.2/config/server.properties############################# Server Basics ############################## 节点id, 集群中每个节点的唯一标识, 每台机器不能重复192.168.0.170为1,192.168.0.171为2,192.168.0.149为2
broker.id1############################# Socket Server Settings #############################
# 设置 Kafka 监听的地址和端口, 需要对应机器的ip, 端口自定义
listenersPLAINTEXT://192.168.0.170:8114# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs/opt/app/kafka_2.13-3.5.2/data# 分区数为 3Kafka 会将主题数据分成 3 个分区这可以提高并行消费的能力。
num.partitions3# 当 Kafka 启动或发生崩溃时它会执行数据恢复操作以确保日志文件的一致性并重建索引文件。指定了每个数据目录中用于恢复的线程数。
num.recovery.threads.per.data.dir2############################# Internal Topic Settings #############################
# 用于配置存储消费者偏移量offsets的内部主题 (__consumer_offsets 主题) 的副本数的参数。
offsets.topic.replication.factor3
# Kafka 中的事务功能依赖于事务日志来记录所有的事务状态信息这些信息用于确保事务的一致性和可靠性。
# 用于指定事务状态日志的副本数也就是该日志在 Kafka 集群中的副本数。
transaction.state.log.replication.factor3
# 如果 Kafka 集群有 3 个节点建议将 transaction.state.log.min.isr 设置为 2。
# 这意味着事务状态日志需要至少 2 个副本是同步的才能继续写入事务。
# 这样可以保证即使一个副本不可用或出现故障仍然有足够的副本来确保事务日志的一致性和容错性。
transaction.state.log.min.isr2############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age#log.retention.ms10000 # 当我们同时设置了保留时间和保留大小时kafka满足其中任意一个条件时就会删除日志段。
# 日志保留时间 72h
log.retention.hours72
# 日志保留大小20GB
log.retention.bytes21474836480
# 每个日志段大小1G
log.segment.bytes1073741824
# 每5分钟扫描一次
log.retention.check.interval.ms300000############################# Zookeeper ############################## 设置zk集群
zookeeper.connect192.168.0.170:8111,192.168.0.171:8111,192.168.0.149:8111# zk连接超时时间
zookeeper.connection.timeout.ms18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms0## 分区副本的最小同步数通常配置为集群节点数减一
## 如果 replication.factor3 且 min.insync.replicas2生产者在 acksall 时只有当至少 2 个副本完成了同步写操作才会被确认。
min.insync.replicas2
# 它定义了每个分区的数据在 Kafka 集群中有多少个副本以提高数据的可靠性和容错性。
# 如果一个分区的某个副本所在的节点宕机Kafka 可以从其他副本继续读取数据确保数据可用性。
# 如果 default.replication.factor3那么每个新建主题的分区会默认有 3 个副本。
default.replication.factor3# 允许删除主题
delete.topic.enabletrue
5. 启动参数更改
编辑启动文件
vim /opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh找到参数KAFKA_HEAP_OPTS设置的地方, 修改为如下
export KAFKA_HEAP_OPTS-Xmx4G -Xms4G6. sh文件授权
进入bin目录
cd /opt/app/kafka_2.13-3.5.2/bin/授予.sh执行权限
chmod 755 *.sh7. 启动集群
在每台机器上启动kafka
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh -daemon /opt/app/kafka_2.13-3.5.2/config/server.properties查看启动状态
ps -ef|grep kafka.Kafka停止命令如下
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-stop.sh8. 防火墙开启端口
firewall-cmd --add-port8114/tcp --permanent;
firewall-cmd --reload9. 创建主题
因为我们已经在配置文件中指定num.partitions3和default.replication.factor3,所以就不用在创建主题的时候指定分区数和副本数了
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server 192.168.0.170:811410. 验证主题配置
创建成功后可以使用 --describe 命令查看主题的配置信息
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server 192.168.0.170:8114主要检查分区数量(PartitionCount)和副本数量(ReplicationFactor)
Topic: my-topic TopicId: 6941T0WBTpS9UaXJT-ytYw PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas2,segment.bytes1073741824,retention.bytes21474836480Topic: my-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2Topic: my-topic Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: my-topic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,111. 查看主题列表
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --list12. 删除指定主题
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --delete --topic topic-name安装kafka可视化管理工具(可选)
在这里我选择kafkaUI-lite下载下方的二进制安装包就行
https://gitee.com/freakchicken/kafka-ui-lite/releases/tag/v1.2.11
1. 上传并解压。
tar -zxvf kafka-ui-lite-1.2.11.tar.gz -C /opt/app/kafka-ui-lite-1.2.112. 修改配置文件
如果想修改元数据库为mysql, 修改conf/application.properties中的以下配置 在这里我们直接使用linux自带的sqlite数据库
server.port19092
spring.datasource.driver-class-nameorg.sqlite.JDBC
spring.datasource.urljdbc:sqlite::resource:data.db
spring.datasource.username
spring.datasource.password查看系统有无sqlite
rpm -qa | grep sqlite有sqlite就会有如下输出
sqlite-3.7.17-8.el7_7.1.x86_64如果使用mysql数据库就需要执行sql目录下的sql脚本
[rootlocalhost sql]# pwd
/opt/app/kafka-ui-lite-1.2.11/sql
[rootlocalhost sql]# ll
总用量 8
-rw-r--r--. 1 root root 1540 4月 13 2021 ddl_mysql.sql
-rw-r--r--. 1 root root 1077 4月 13 2021 ddl_sqlite.sql3. 启动kafka-ui-lite
进入到安装目录下执行如下命令
# 前台启动
sh bin/kafkaUI.sh start
# 后台启动
sh bin/kafkaUI.sh -d start
# 关闭后台启动的进程
sh bin/kafkaUI.sh stop4.日志输出
/opt/app/kafka-ui-lite-1.2.11/logs5. 防火墙开启端口
# 开启kafka-ui-lite可视化管理工具端口
firewall-cmd --zonepublic --add-port19092/tcp --permanent
# 重新加载防火墙
firewall-cmd --reload测试
1. 进入kafka管理页面
访问地址http://192.168.1.100:19092
尽量不要使用此客户端创建主题, 我试了一下, 创建的主题无法正常生产消费消息
2. 添加配置 3. 生产、消费消息 4. 模拟节点宕机
可以随机停止一个zk节点, kafka节点, 测试一下集群是否能够正常生产消费