视频播放类网站建设费用,代写,女包建设网站前的市场分析,益阳市建设局网站目录 一、zookeeper概述#xff1a;
1、zookeeper工作机制#xff1a;
2、zookeeper主要作用#xff1a;
3、zookeeper特性#xff1a;
4、zookeeper的应用场景#xff1a;
5、领导者和追随者#xff1a;zookeeper的选举机制
二、zookeeper安装部署#xff1a;
三…目录 一、zookeeper概述
1、zookeeper工作机制
2、zookeeper主要作用
3、zookeeper特性
4、zookeeper的应用场景
5、领导者和追随者zookeeper的选举机制
二、zookeeper安装部署
三、消息队列kafka
1、消息队列概述
1.1、消息队列的作用
1.2、消息队列的模式
1.3、kafka的工作流程
四、 Kafka2.7.0的安装部署 五、kafka3.4.1安装部署
六、ELKfilebeatkafka的安装部署 一、zookeeper概述
zookeeper是一个开源的分布式架构。提供协调服务Apache项目 1、zookeeper工作机制
基于观察者模式设计的分布式服务管理架构。
主要职责存储和管理数据。分布式节点上的服务接收观察者的注册。一旦这些分布式节点上的数据发生变化由zookeeper来负责通知分布式节点上的服务
总结zookeeper 文件系统 通知机制。 zookeeper分为领导者和被迫者 leader follower 组成的集群
只要有一半以上的集群存活zookeeper集群就可以正常工作。适用于安装奇数台的服务集群 2、zookeeper主要作用
全局数据一致每个zookeeper节点都保存相同的数据。维护监控服务的数据一致 3、zookeeper特性
Zookeeper一个领导者Leader多个跟随者Follower组成的集群。Zookeepe集群中只要有半数以上节点存活Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。全局数据一致每个Server保存一份相同的数据副本Client无论连接到哪个Server数据都是一致的。更新请求顺序执行来自同一个Client的更新请求按其发送顺序依次执行即先进先出。数据更新原子性一次数据更新要么成功要么失败。实时性在一定时间范围内Client能读到最新数据。 4、zookeeper的应用场景
统一命名服务在分布式的环境下对所有的应用和服务进行统一命名统一配置管理配置文件同步kafka的配置文件被修改可以快速同步到其他节点统一集群管理实时掌握所有节点的状态服务器动态上下限负载均衡把访问的服务器的数据发送到访问最少的服务器处理客户端的请求 5、领导者和追随者zookeeper的选举机制
三台服务器A B C
A先启动发起第一次选举投票投给自己有3台但是自己只有1票不满足半数A的状态的looking
B启动再发起一次选举A和B分别投自己一票交换选票信息A发现B的myid比A大A的这一票会转而投给B。A0 B2没有半数以上的结果A B会进入lookingB有可能成为leader
C启动C的myid若最大A和B都会把票都会投给C 这时A B C都会把票投给CA0 B0 C3
C的状态变为leader A和B会变成follower 只要leader确定后续的服务器都是追随者。 只有两种情况会开启选举机制
初始化到达情况下会产生选举服务器之间和leader丢失了连接状态
若leader已存在建立连接即可
leader不存在
1、服务器ID大的胜出
2、EPOCH大直接胜出
3、EPOCH相同事务ID大的胜出
EPOCH是每个leader任期的代号没有leader大家的逻辑地位是相同的没投完一次之后数据是递增的。
事务ID是用来标识服务器的每一次变更每变更一次事务ID变化一次
服务器IDzookeeper集群中都有一个ID每台机器不重复和myid保持一致 service zookeeper restart
service kafka restart 二、zookeeper安装部署
部署zookeeper集群三台都安装zookeeperkafka最少2核4G
20.0.0.24
20.0.0.25
20.0.0.26 关闭防火墙和安全机制 升级java环境
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel 安装 Zookeeper
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper 修改配置文件
三台节点上同步操作
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg vim zoo.cfg tickTime2000 #通信心跳时间Zookeeper服务器与客户端心跳时间单位毫秒
initLimit10 #Leader和Follower初始连接时能容忍的最多心跳数tickTime的数量这里表示为10*2s
syncLimit5 #Leader和Follower之间同步通信的超时时间这里表示如果超过5*2sLeader认为Follwer死掉 并从服务器列表中删除Follwer
dataDir/opt/zookeeper/data ●修改指定保存Zookeeper中的数据的目录目录需要单独创建
dataLogDir/opt/zookeeper/logs ●添加指定存放日志的目录目录需要单独创建
clientPort2181 #客户端连接端口 #最后一行添加集群信息
server.120.0.0.24:3188:3288
server.220.0.0.25:3188:3288
server.320.0.0.26:3188:3288
1每个zookeeper集群的初始myid
20.0.0.24服务器的初始地址
3188领导者和追随者之间交换信息的端口内部通信的端口
3288一旦leader丢失响应开启选举3288就是用来执行选举时的服务器之间通信端口。 在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs 创建myid文件
在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3
echo 1 /opt/zookeeper/data/myid
echo 2 /opt/zookeeper/data/myid
echo 3 /opt/zookeeper/data/myid 配置 Zookeeper 启动脚本
三台节点全部配置
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME/opt/zookeeper
case $1 in
start)
echo ---------- zookeeper 启动 ------------
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo ---------- zookeeper 停止 ------------
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo ---------- zookeeper 重启 ------------
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo ---------- zookeeper 状态 ------------
$ZK_HOME/bin/zkServer.sh status
;;
*) echo Usage: $0 {start|stop|restart|status}
esac 设置开机自启
chmod x /etc/init.d/zookeeper
chkconfig --add zookeeper 分别启动 Zookeeper
service zookeeper start
查看当前状态leader、follower
service zookeeper status 三、消息队列kafka
1、消息队列概述
为什么要引入消息队列MQ
他也是一个中间键。在高并发环境下同步请求来不及处理。来不及处理的请求会形成阻塞
比方说数据库就会形成行锁或者表锁。请求线程满了超标了too many connection引发整个系统雪崩 1.1、消息队列的作用
异步处理请求。流量削峰应用解耦。
解耦只要通信保证其他的修改不影响整个集群每个组件可以独立的扩展修改降低组件之间的依赖性。
耦合在软件系统当中修改一个组件需要修改所有其他组件高度耦合
低度耦合改其中一个对其他组件影响不大无需修改所有
可恢复性系统当中有一部分组件消失不影响整个系统。也就是说在消息队列当中即使有一个处理消息的进程失败一旦恢复还可以重新加入到队列当中继续处理消息
缓冲机制可以控制和优化数据经过系统的时间和速度。解决生产消息和消费消息处理速度不一致的问题。
峰值的处理能力消息队列在峰值的情况之下能够顶住突发的访问压力。避免专门为了突发情况而对系统进行修改 异步通信允许用户把一个消息放入队列但是不立即处理等用户想处理的时候在处理 1.2、消息队列的模式
点对点 一对一 消息的生产者发送消息到队列中消费者从队列中提取消息消费者提取完之后队列中被提取的消息将会被移除。后续消费者不能再消费队列中的消息。消息队列可以有多个消费者但是一个消息只能由一个消费者提取
RABBITMQ 发布、订阅模式一对多观察者模式消费者提取数据之后队列当中的消息不会被清除
生产者发布一个消息到主题所有消费者都是通过主题获取消息
主题topic topic类似于一个数据流管道生产者把消息发布到主题消费者从主题当中订阅数据。每一个主题都可以被分区每个分区都有自己的偏移量。
分区partition 每个主题都可以分成多个分区。每个分区是数据的有序子集分区可以允许kafka进行水平拓展以处理大量数据。消息在分区中按照偏移量存储消费者可以独立读取每个分区的数据。
偏移量是每个消息在分区中的唯一标识。消费者通过偏移量跟踪、获取、已读或者未读消息的位置也可以通过提交偏移量来记录已处理的信息。 生产者producer 生产者把数据发送到kafka的主题当中负责写入消息
消费者consumer 从主题当中读取数据消费者可以是一个也可以是多个。每个消费者有一个唯一的消费者组IDkafka通过消费者实现负载均衡和容错性
经纪人broker 每个kafka节点都有一个broker每个broker负责一台服务器id唯一存储主题分区中的数据处理生产和消费者的请求。维护元数据3.0之前zookeeper维护。3.0之后自己管理元数据
zookeeper负责保存元数据元数据就是topic的相关信息发布在哪台主机上指定了多少分区以及副本数偏移量
zookeeper会自建一个主题 __consumer_offsets
3.0之后不依赖zookeeper的核心就是元数据由kafka节点自己管理 1.3、kafka的工作流程 四、 Kafka2.7.0的安装部署
cd /opt/
tar zxvf kafka_2.13-2.7.0.tgz
mv kafka_2.13-2.7.0 kafka/ 修改配置文件
cd /opt/kafka/config
cp server.properties server.properties.bak
vim server.properties
21行
broker的全局唯一编号每个broker不能重复因此要在其他机器上配置 broker.id1、broker.id2 31行
指定监听的IP和端口如果修改每个broker的IP需区分开来也可保持默认配置不用修改
这里上面broker配置过了 42行
num.network.threads3
broker 处理网络请求的线程数量一般情况下不需要去修改 45行
num.io.threads8
用来处理磁盘IO的线程数量数值应该大于硬盘数 48行
socket.send.buffer.bytes102400
发送套接字的缓冲区大小 51行
socket.receive.buffer.bytes102400
接收套接字的缓冲区大小 54行
socket.request.max.bytes104857600
请求套接字的缓冲区大小 60行
log.dirs/var/log/kafka
kafka运行日志存放的路径也是数据存放的路径 65行
num.partitions1
topic在当前broker上的默认分区个数会被topic创建时的指定参数覆盖 69行
num.recovery.threads.per.data.dir1
用来恢复和清理data下数据的线程数量 103行
log.retention.hours168
segment文件数据文件保留的最长时间单位为小时默认为7天超时将被删除 110行
log.segment.bytes1073741824
一个segment文件最大的大小默认为 1G超出将新建一个新的segment文件
Kafka 以日志文件的形式维护其数据而这些日志文件被分割成多个日志段。当一个日志段达到指定的大小时就会创建一个新的日志段。 123行
配置连接Zookeeper集群地址
zookeeper.connect20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 修改环境变量日志段是主题分区日志文件的一部分。
vim /etc/profile
export KAFKA_HOME/opt/kafka
export PATH$PATH:$KAFKA_HOME/bin source /etc/profile 配置 kafka 启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME/opt/kafka
case $1 in
start)
echo ---------- Kafka 启动 ------------
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo ---------- Kafka 停止 ------------
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo ---------- Kafka 状态 ------------
count$(ps -ef | grep kafka | egrep -cv grep|$$)
if [ $count -eq 0 ];then echo kafka is not running else echo kafka is running fi
;;
*) echo Usage: $0 {start|stop|restart|status}
esac 设置开机自启
chmod x /etc/init.d/kafka
chkconfig --add kafka 分别启动 Kafka
service kafka start 做地址映射
vim /etc/hosts 20.0.0.24 test1
20.0.0.25 test2
20.0.0.26 test3 Kafka 命令行操作
kafka的命令也只能在bin目录下执行
cd /opt/kafka/bin 创建topic主题
1、在kafka的bin目录下是所有的kafka可执行命令文件
2、--zookeeper 指定的是zookeeper的地址和端口保存kafka的元数据
3、--replication-factor 2 定义每个分区的副本数
4、partitions 3 指定主题的分区数
5、--topic test1 指定主题的名称。 kafka-topics.sh --create --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 --replication-factor 2 --partitions 3 --topic test1 20.0.0.24:2181定义集群服务器地址如果有多个 IP 地址使用逗号分割一般使用一个 IP 即可
--replication-factor定义分区副本数1 代表单副本建议为 2
--partitions定义分区数
--topic定义 topic 名称 查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 查看topic 的详情
kafka-topics.sh --describe --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181
查看某个topic 的详情
kafka-topics.sh --describe --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 --topic test1 Partition分区编号
Leader每个分区都有一个领导者Leader领导者负责处理分区的读写操作。
在上述输出中领导者的编号分别为 3、1、3。
Replicas每个分区可以有多个副本Replicas用于提供冗余和容错性。在上述输出中Replica 3、1、2 分别对应不同的 Kafka broker。
IsrISRIn-Sync Replicas表示当前与领导者保持同步的副本。
ISR 3、1分别表示与领导者同步的副本。 发布消息
kafka-console-producer.sh --broker-list 20.0.0.24:9092,20.0.0.25:9092,20.0.0.26:9092 --topic test1 消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.24:9092,20.0.0.25:9092,20.0.0.26:9092 --topic test1
后接--from-beginning会把主题中以往所有的数据都读取出来 __consumer_offsets 主题的作用是记录每个消费者组中每个消费者在每个分区上的偏移量。
这样当消费者组中的消费者重新加入或者新的消费者加入时它们可以从上次提交的偏移量处继续消费消息
而不会重复消费或错过消息。
请注意对于这个主题配置为 Replication Factor 为 1 可能会对高可用性造成一些影响。
在生产环境中通常会将 __consumer_offsets 主题的 Replication Factor 设置得更高
以确保偏移量信息的可靠性。 修改分区数
kafka-topics.sh --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 --alter --topic test1 --partitions 6 //删除 topic
kafka-topics.sh --delete --zookeeper 20.0.0.24:2181,20.0.0.25:2181,20.0.0.26:2181 --topic test1 Note: This will have no impact if delete.topic.enable is not set to true.
是关于删除 Kafka 主题的一个重要提示。默认情况下Kafka 集群禁用了主题删除操作为了确保不会意外删除数据。
在 Kafka 中要执行主题删除操作需要确保 delete.topic.enable 配置项被设置为 true。
这个配置项决定了是否允许删除主题。如果没有设置或设置为 false即使你执行了删除主题的命令
实际上也不会删除主题而只是标记主题为 marked for deletion。
在生产环境中特别谨慎地处理主题删除操作 在配置文件中添加将彻底删除topic.
delete.topic.enabletrue 在zookeeper中查看topic信息
/zkCli.sh -server 192.168.233.30:2181 ls /brokers/topics 总结
zookeeper 主要是分布式观察者模式统一各个服务器节点的数据
在kafka当中收集保存kafka的元数据
kafka消息队列订阅发布模式 五、kafka3.4.1安装部署
kafka3.4.1的安装步骤和2.7.1的步骤一模一样
但是命令有些区别原因是不再依靠zookeeper传输数据了 Kafka 命令行操作
//创建topic
kafka-topics.sh --create --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --replication-factor 2 --partitions 3 --topic test1 -------------------------------------------------------------------------------------
--bootstrap-server定义 bootstrap-server 集群服务器地址如果有多个 IP 地址使用逗号分割一般使用一个 IP 即可
--replication-factor定义分区副本数1 代表单副本建议为 2
--partitions定义分区数
--topic定义 topic 名称
------------------------------------------------------------------------------------- //查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 //查看某个 topic 的详情
[roottest1 efak]# kafka-topics.sh --describe --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092
Topic: test1 TopicId: ihBKilk6SNyP7RrVHygCog PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes1073741824
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test1 Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0 Leader每个分区都有一个领导者Leader领导者负责处理分区的读写操作。
在上述输出中领导者的编号分别为 2、1、0。 Replicas每个分区可以有多个副本Replicas用于提供冗余和容错性。
在上述输出中Replica 0、1、2 分别对应不同的 Kafka broker。 IsrISRIn-Sync Replicas表示当前与领导者保持同步的副本。
ISR 0、1、2 分别表示与领导者同步的副本。 //发布消息
kafka-console-producer.sh --broker-list 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1 //消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1 --from-beginning -------------------------------------------------------------------------------------
--from-beginning会把主题中以往所有的数据都读取出来
------------------------------------------------------------------------------------- //修改分区数
kafka-topics.sh --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --alter --topic test1 --partitions 6 //删除 topic
kafka-topics.sh --delete --bootstrap-server 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092 --topic test1 六、ELKfilebeatkafka的安装部署 工作流程 部署 ZookeeperKafka 集群
zookeeperkafka节点
20.0.0.45
20.0.0.46 2.部署 Filebeat
cd /usr/local/filebeat vim filebeat.yml
filebeat.prospectors:
- type: log enabled: true paths: - /var/log/nginx/access_log tags: [access] - type: log enabled: true paths: - /var/log/nginx/error_log tags: [error] #添加输出到 Kafka 的配置
output.kafka: enabled: true hosts: [20.0.0.45:9092,20.0.0.46:9092] topic: nginx 因为不转发到logstash下面的output全部注释掉 启动 filebeat
nohup ./filebeat -e -c filebeat.yml filebeat.out logstash
启动logstash
systemctl start logstash.service
ps -elf | grep logstash 部署 ELK在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d/ vim kafka.conf
input { kafka { bootstrap_servers 192.168.233.10:9092,192.168.233.20:9092,192.168.233.30:9092
#kafka集群地址 topics nginx
#拉取的kafka的指定topic type nginx_kafka
#指定 type 字段 codec json
#解析json格式的日志数据
auto_offset_reset latest
#拉取最近数据earliest为从头开始拉取
decorate_events true
#传递给elasticsearch的数据额外增加kafka的属性数据 }
} output { if nginx_access in [tags] { elasticsearch { hosts [192.168.233.12:9200,192.168.233.13:9200] index nginx_access-%{YYYY.MM.dd} } } if nginx_error in [tags] { elasticsearch { hosts [192.168.233.12:9200,192.168.233.13:9200] index nginx_error-%{YYYY.MM.dd} } } stdout { codec rubydebug }
} #启动 logstash
logstash -f /opt/log/kafka.conf --path.data /opt/kafka1 在此之前要保证ES启动
systemctl restart elasticsearch
cd /opt/elasticsearch-head-master
npm run start
netstat -natp |grep 9100
netstat -natp |grep 9200 去kafka看有没有创建topic是filebeat操作的
kafka-topics.sh --list --bootstrap-server 20.0.0.45:9092,20.0.0.46:9092 消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.45:9092,20.0.0.46:9092 --topic nginx --from-beginning logstash也命中消息 最后去logstash浏览器查看
索引生成