网站前台界面模板下载,给企业做网站的平台,网站导航是怎么做的,南宁免费网站建站模板Kafka 入门简介 1.什么是 Kafka2.Kafka 的基本概念3.Kafka 分布式架构4.配置单机版 Kafka4.1 下载并解压包4.2 启动 Kafka4.3 创建 Topic4.4 向 Topic 中发送消息4.5 从 Topic 中消费消息 5.实验5.1 实验一#xff1a;Python 实现生产者消费者5.2 实验二#xff1a;消费组实现… Kafka 入门简介 1.什么是 Kafka2.Kafka 的基本概念3.Kafka 分布式架构4.配置单机版 Kafka4.1 下载并解压包4.2 启动 Kafka4.3 创建 Topic4.4 向 Topic 中发送消息4.5 从 Topic 中消费消息 5.实验5.1 实验一Python 实现生产者消费者5.2 实验二消费组实现容错性机制5.3 实验三Offset 管理 6.总结 1.什么是 Kafka
Kafka 是一个分布式流处理系统流处理系统使它可以像消息队列一样 publish 或者 subscribe 消息分布式提供了容错性并发处理消息的机制。
2.Kafka 的基本概念
Kafka 运行在集群上集群包含一个或多个服务器。Kafka 把消息存在 Topic 中每一条消息包含键值Key值Value和时间戳Timestamp。
Kafka 有以下一些基本概念
Producer消息生产者就是向 Kafka Broker 发消息的客户端。Consumer消息消费者是消息的使用方负责消费 Kafka 服务器上的消息。Topic主题由用户定义并配置在 Kafka 服务器用于建立 Producer 和 Consumer 之间的订阅关系。生产者发送消息到指定的 Topic 下消息者从这个 Topic 下消费消息。Partition消息分区一个 Topic 可以分为多个 Partition每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 idOffset。Broker一台 Kafka 服务器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker 可以容纳多个 Topic。Consumer Group消费者分组用于归组同类消费者。每个 Consumer 属于一个特定的 Consumer Group多个消费者可以共同消费一个Topic下的消息每个消费者消费其中的部分消息这些消费者就组成了一个分组拥有同一个分组名称通常也被称为消费者集群。Offset消息在 Partition 中的偏移量。每一条消息在 Partition 都有唯一的偏移量消费者可以指定偏移量来指定要消费的消息。
3.Kafka 分布式架构 如上图所示Kafka 将 Topic 中的消息存在不同的 Partition中。如果存在键值Key消息按照键值做分类存在不同的 Partition 中如果不存在键值消息按照轮询Round Robin机制存在不同的 Partition 中。默认情况下键值决定了一条消息会被存在哪个 Partition 中。 Partition 中的消息序列是有序的消息序列。Kafka 在 Partition 使用偏移量Offset来指定消息的位置。一个 Topic 的一个 Partition 只能被一个 Consumer Group 中的一个 Consumer 消费同组的多个 Consumer 消费同一个 Partition 中的数据是不允许的但是一个 Consumer 可以消费多个 Partition 中的数据。
Kafka 将 Partition 的数据复制到不同的 Broker提供了 Partition 数据的备份。每一个 Partition 都有一个 Broker 作为 Leader若干个 Broker 作为 Follower。所有的数据读写都通过 Leader 所在的服务器进行并且 Leader 在不同 Broker 之间复制数据。 上图中对于 Partition 0Broker 1 是它的 LeaderBroker 2 和 Broker 3 是 Follower。对于 Partition 1Broker 2 是它的 LeaderBroker 1 和 Broker 3 是 Follower。 在上图中当有 Client也就是 Producer要写入数据到 Partition 0 时会写入到 Leader Broker 1Broker 1 再将数据复制到 Follower Broker 2 和 Broker 3。 在上图中Client 向 Partition 1 中写入数据时会写入到 Broker 2因为 Broker 2 是 Partition 1 的 Leader然后 Broker 2 再将数据复制到 Follower Broker 1 和 Broker 3 中。
上图中的 Topic 一共有 3 个 Partition对每个 Partition 的读写都由不同的 Broker 处理因此总的吞吐量得到了提升。
4.配置单机版 Kafka
这里我们使用 Kafka 0.10.0.0 0.10.0.0 0.10.0.0 版本。
4.1 下载并解压包
$ wget https://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
$ tar -xzf kafka_2.11-0.10.0.0.tgz
$ cd kafka_2.11-0.10.0.04.2 启动 Kafka
Kafka 需要用到 Zookeeper所以需要先启动 Zookeeper。我们这里使用下载包里自带的单机版 Zookeeper。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...然后启动 Kafka
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...4.3 创建 Topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test查看创建的 Topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test4.4 向 Topic 中发送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message4.5 从 Topic 中消费消息
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message5.实验
5.1 实验一Python 实现生产者消费者
kafka-python 是一个 Python 的 Kafka 客户端可以用来向 Kafka 的 Topic 发送消息、消费消息。
这个实验会实现一个 Producer 和一个 Consumer。roducer 向 Kafka 发送消息Consumer 从 Topic 中消费消息。结构如下图
# producer.py
import time
from kafka import KafkaProducerproducer KafkaProducer(bootstrap_serverslocalhost:9092)
i 0
while True:ts int(time.time() * 1000)producer.send(topictest, valuestr(i), keystr(i), timestamp_msts)producer.flush()print ii 1time.sleep(1)# consumer.py
from kafka import KafkaConsumerconsumer KafkaConsumer(test, bootstrap_servers[localhost:9092])
for message in consumer:print message接下来创建一个 Topic名为 test。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.打开两个窗口中我们在 window-1 中运行 producer.py如下
# window-1
$ python producer.py
0
1
2
3
4
5
...在 window-2 中运行 consumer.py如下
# window-2
$ python consumer.py
ConsumerRecord(topicutest, partition0, offset128, timestamp1512554839806, timestamp_type0, key128, value128, checksum-1439508774, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset129, timestamp1512554840827, timestamp_type0, key129, value129, checksum1515993224, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset130, timestamp1512554841834, timestamp_type0, key130, value130, checksum453490213, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset131, timestamp1512554842841, timestamp_type0, key131, value131, checksum-632119731, serialized_key_size3, serialized_value_size3)
...可以看到 window-2 中的 Consumer 成功的读到了 Producer 写入的数据。
5.2 实验二消费组实现容错性机制
这个实验将展示消费组的容错性的特点。这个实验中将创建一个有 2 个 Partition 的 Topic和 2 个 Consumer这 2 个 Consumer 共同消费同一个 Topic 中的数据。结构如下所示 Producer 部分代码和实验一相同这里不再重复。Consumer 需要指定所属的 Consumer Group代码如下
# consumer.py
from kafka import KafkaConsumerconsumer KafkaConsumer(test, bootstrap_servers[localhost:9092], group_idtestgroup)
for message in consumer:print message接下来我们创建一个 Topic名为 Test设置 Partition 数量为 2。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic test.打开三个窗口一个窗口运行 Producer还有两个窗口运行 Consumer。
运行 Consumer 的两个窗口的输出如下
# window-1
$ python consumer.py
ConsumerRecord(topicutest, partition0, offset11, timestamp1512556619298, timestamp_type0, key15, value15, checksum-1492440752, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset12, timestamp1512556621308, timestamp_type0, key17, value17, checksum-1029407634, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset13, timestamp1512556622316, timestamp_type0, key18, value18, checksum1544755853, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset14, timestamp1512556624326, timestamp_type0, key20, value20, checksum2130557725, serialized_key_size2, serialized_value_size2)
...# window-2
$ python consumer.py
ConsumerRecord(topicutest, partition1, offset6, timestamp1512556617287, timestamp_type0, key13, value13, checksum-1494513008, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset7, timestamp1512556618293, timestamp_type0, key14, value14, checksum-1499251221, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset8, timestamp1512556620303, timestamp_type0, key16, value16, checksum-783427375, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset9, timestamp1512556623321, timestamp_type0, key19, value19, checksum-1902514040, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset10, timestamp1512556626337, timestamp_type0, key22, value22, checksum782849423, serialized_key_size2, serialized_value_size2)
...可以看到两个 Consumer 同时运行的情况下它们分别消费不同 Partition 中的数据。window-1 中的 Consumer 消费 Partition 0 中的数据window-2 中的 Consumer 消费 Parition 1 中的数据。
我们尝试关闭 window-1 中的 Consumer可以看到如下结果
# window-2ConsumerRecord(topicutest, partition1, offset105, timestamp1512557514410,timestamp_type0, key46, value46, checksum-1821060627, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset106, timestamp1512557518428,timestamp_type0, key50, value50, checksum281004575, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset107, timestamp1512557521442,timestamp_type0, key53, value53, checksum1245067939, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition1, offset108, timestamp1512557525461,timestamp_type0, key57, value57, checksum-1003840299, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset98, timestamp1512557494325, timestamp_type0, key26, value26, checksum-1576244323, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset99, timestamp1512557495329, timestamp_type0, key27, value27, checksum510530536, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset100, timestamp1512557502360,timestamp_type0, key34, value34, checksum1781705793, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset101, timestamp1512557504368,timestamp_type0, key36, value36, checksum2142677730, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset102, timestamp1512557505372,timestamp_type0, key37, value37, checksum-1376259357, serialized_key_size2, serialized_value_size2)
...刚开始 window-2 中的 Consumer 只消费 Partition 1 中的数据当 window-1 中的 Consumer 退出后window-2 中的 Consumer 中也开始消费 Partition 0 中的数据了。
5.3 实验三Offset 管理
Kafka 允许 Consumer 将当前消费的消息的 Offset 提交到 Kafka中这样如果 Consumer 因异常退出后下次启动仍然可以从上次记录的 Offset 开始向后继续消费消息。
这个实验的结构和实验一的结构是一样的使用一个 Producer一个 Consumer主题 test 的 Partition 数量设为 1。
Producer 的代码和实验一中的一样这里不再重复。Consumer 的代码稍作修改这里 Consumer 中打印出下一个要被消费的消息的 Offset。Consumer 代码如下
from kafka import KafkaConsumer, TopicPartitiontp TopicPartition(test, 0)
consumer KafkaConsumer(bootstrap_servers[localhost:9092], group_idtestgroup, auto_offset_resetearliest, enable_auto_commitFalse)
consumer.assign([tp])
print start offset is, consumer.position(tp)
for message in consumer:print message在一个窗口中启动 Producer在另一个窗口并且启动 Consumer。Consumer 的输出如下
$ python consumer.py
start offset is 98
ConsumerRecord(topicutest, partition0, offset98, timestamp1512558902904, timestamp_type0, key98, value98, checksum-588818519, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset99, timestamp1512558903909, timestamp_type0, key99, value99, checksum1042712647, serialized_key_size2, serialized_value_size2)
ConsumerRecord(topicutest, partition0, offset100, timestamp1512558904915, timestamp_type0, key100, value100, checksum-838622723, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset101, timestamp1512558905920, timestamp_type0, key101, value101, checksum-2020362485, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset102, timestamp1512558906926, timestamp_type0, key102, value102, checksum-345378749, serialized_key_size3, serialized_value_size3)
...可以尝试退出 Consumer再启动 Consumer。每一次重新启动Consumer 都是从 offset98 的消息开始消费的。
修改 Consumer 的代码如下在 Consumer 消费每一条消息后将 offset 提交回 Kafka。
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadatatp TopicPartition(test2, 0)
consumer KafkaConsumer(bootstrap_servers[localhost:9092], group_idtestgroup, auto_offset_resetearliest, enable_auto_commitFalse)
consumer.assign([tp])
print start offset is , consumer.position(tp)
for message in consumer:print messageconsumer.commit(message.offset 1)启动 Consumer
$ python consumer.py
start offset is 98
ConsumerRecord(topicutest, partition0, offset98, timestamp1512559632153, timestamp_type0, key824, value824, checksum828849435, serialized_key_size3, serialized_value_size3)
...
ConsumerRecord(topicutest, partition0, offset827, timestamp1512559635164, timestamp_type0, key827, value827, checksum442222330, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset828, timestamp1512559636169, timestamp_type0, key828, value828, checksum-267344764, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset829, timestamp1512559637173, timestamp_type0, key829, value829, checksum1225853586, serialized_key_size3, serialized_value_size3)可以看到 Consumer 从 offset98 的消息开始消费到 offset829 时我们 CtrlC 退出 Consumer。
我们再次启动 Consumer
$ python consumer.py
start offset is 830
ConsumerRecord(topicutest, partition0, offset830, timestamp1512559638177, timestamp_type0, key830, value830, checksum1003305652, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset831, timestamp1512559639181, timestamp_type0, key831, value831, checksum-361607666, serialized_key_size3, serialized_value_size3)
ConsumerRecord(topicutest, partition0, offset832, timestamp1512559640185, timestamp_type0, key832, value832, checksum-345891932, serialized_key_size3, serialized_value_size3)
...可以看到重新启动后Consumer 从上一次记录的 offset 开始继续消费消息。之后每一次 Consumer 重新启动Consumer 都会从上一次停止的地方继续开始消费。
6.总结
本文主要介绍了一下 Kafka 的基本概念并结合一些实验帮助理解 Kafka 中的一些难点如多个 Consumer 的容错性机制Offset 管理。