学校网站建设的风险分析,二维码图片制作,搜索引擎案例分析结论,字体设计素材网Kafka简介 Kafka概念关键功能应用场景 Kafka的原理Kafka 的消息模型早期的队列模型发布-订阅模型Producer、Consumer、Broker、Topic、PartitionPartitionoffsetISR Consumer Groupleader选举Controller leaderPartition leader producer 的写入流程 多副本机制replicas的同步时… Kafka简介 Kafka概念关键功能应用场景 Kafka的原理Kafka 的消息模型早期的队列模型发布-订阅模型Producer、Consumer、Broker、Topic、PartitionPartitionoffsetISR Consumer Groupleader选举Controller leaderPartition leader producer 的写入流程 多副本机制replicas的同步时机好处 kafka的优化吞吐量 zookeeper在kafka中的作用Broker注册Topic注册生产者负载均衡消费者负载均衡分区 与 消费者 的关系消息 消费进度Offset 记录消费者注册 kafka对消息的保证Kafka 如何保证消息的消费顺序Kafka 如何保证消息不丢失生产者丢失消息的情况消费者丢失消息的情况Kafka 弄丢了消息 Kafka 如何保证消息不重复消费 kafka和rabbitMq的对比 Kafka实战在Spring Boot 程序中使用 Kafka 作为消息队列1.创建项目2.配置kafka3.创建要发送的消息实体类4.创建发送消息的生产者5.创建消费消息的消费者6.创建一个 Rest Controller7.测试 主要内容是kafka的原理和使用 参考https://www.cnblogs.com/answerThe/p/11267454.html
Kafka概念
Kafka 是一个分布式流式处理平台
关键功能
消息队列发布和订阅消息流这个功能类似于消息队列这也是 Kafka 也被归类为消息队列的原因。容错的持久方式存储记录消息流Kafka 会把消息持久化到磁盘有效避免了消息丢失的风险。流式处理平台 在消息发布的时候进行处理Kafka 提供了一个完整的流式处理类库。
应用场景
消息队列建立实时流数据管道以可靠地在系统或应用程序之间获取数据。数据处理 构建实时的流数据处理程序来转换或处理数据流。
Kafka的原理
Kafka 的消息模型
早期的队列模型 使用队列Queue作为消息通信载体满足生产者与消费者模式一条消息只能被一个消费者使用未被消费的消息在队列中保留直到被消费或超时。 比如我们生产者发送 100 条消息的话两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半也就是你一个我一个的消费。
假如我们存在这样一种情况我们需要将生产者产生的消息分发给多个消费者并且每个消费者都能接收到完整的消息内容。这种情况队列模型就不好解决了。
发布-订阅模型 发布订阅模型Pub-Sub 使用主题Topic 作为消息通信载体类似于广播模式发布者发布一条消息该消息通过主题传递给所有的订阅者在一条消息广播之后才订阅的用户则是收不到该条消息的。
在发布 - 订阅模型中如果只有一个订阅者那它和队列模型就基本是一样的了。所以说发布 - 订阅模型在功能层面上是可以兼容队列模型的。
Producer、Consumer、Broker、Topic、Partition Producer生产者 : 产生消息的一方。Consumer消费者 : 消费消息的一方每个 Consumer 实例归属于一个 Consumer GroupBroker代理 : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。Topic主题 : Producer 将消息发送到特定的主题Consumer 通过订阅特定的 Topic(主题) 来消费消息。Partition分区 : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition 并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上这也就表明一个 Topic 可以横跨多个 Broker 。
Partition
partition可以看作一个有序的队列里面的数据是储存在硬盘中的追加式的。partition的作用就是提供分布式的扩展一个topic可以有许多partions多个partition可以并行处理数据所以可以处理相当量的数据。只有partition的leader才会进行读写操作folower仅进行复制客户端是感知不到的。 offset
每一条数据都有一个offset是每一条数据在该partition中的唯一标识。各个consumer控制和设置其在该partition下消费到offset位置这样下次可以以该offset位置开始进行消费。 各个consumer的offset位置默认是在某一个broker当中的topic中保存的为防止该broker宕掉无法获取offset信息可以配置在每个broker中都进行保存配置文件中配置
offsets.topic.replication.factor3
transaction.state.log.replication.factor3
transaction.state.log.min.isr3ISR
先来看几个概念
1、ARAssigned Repllicas一个partition的所有副本就是replica不区分leader或follower
2、ISRIn-Sync Replicas能够和 leader 保持同步的 follower leader本身 组成的集合。
3、OSROut-Sync Relipcas不能和 leader 保持同步的 follower 集合
4、公式AR ISR OSR
ISR 的核心就是动态调整
总结Kafka采用的就是一种完全同步的方案而ISR是基于完全同步的一种优化机制。
Consumer Group
Kafka中的消费组Consumer Group是一种机制用于管理多个消费者之间的关系。消费组允许多个消费者同时消费Kafka主题中的消息并且每个消费者可以负责消费一个或多个分区。
引入消费组有以下优点
提升整体消费能力通过增加消费者数量可以提升整体消费能力。在分区数固定的前提下当消费者数量大于分区数时部分消费者将无法分配到分区但仍然可以加入消费组从其他消费者那里获取消息从而提高整体消费效率。支持点对点模式和发布订阅模式通过消费组Kafka可以同时支持点对点模式和发布订阅模式。在点对点模式下生产者将消息发送到队列消费者从队列中获取消息。在发布订阅模式下主题可以看作是消息传递的中介生产者将消息发布到主题上而消费者从主题中订阅消息。实现伸缩性通过增减消费者数量可以提升或降低整体消费的能力。当需要处理更多消息时可以增加更多的消费者而当处理消息的需求减少时可以减少消费者数量。实现负载均衡在消费组内Kafka会自动实现负载均衡。Kafka会根据每个消费者的处理能力将消息分配给不同的消费者确保每个消费者都能充分利用其处理能力从而提高整体处理效率。
leader选举
kafka集群中有2个种leader一种是broker的leader即controller leader还有一种就是partition的leader下面介绍一下2种leader的选举大致流程。
Controller leader
当broker启动的时候都会创建KafkaController对象但是集群中只能有一个leader对外提供服务这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点只有第一个成功创建的节点的KafkaController才可以成为leader其余的都是follower。当leader故障后所有的follower会收到通知再次竞争在该路径下创建节点从而选举新的leader
Partition leader
由controller leader执行
从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合调用配置的分区选择算法选择分区的leader
如何处理所有Replica都不工作
在ISR中至少有一个follower时Kafka可以确保已经commit的数据不丢失但如果某个Partition的所有Replica都宕机了就无法保证数据不丢失了。这种情况下有两种可行的方案
等待ISR中的任一个Replica“活”过来并且选它作为Leader等待时间短选择第一个“活”过来的Replica不一定是ISR中的作为Leader不是ISR中的Replica不能保证一致性不保证已经包含了所有已commit的消息
producer 的写入流程
producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leaderproducer 将消息发送给该 leaderleader 将消息写入本地 logfollowers 从 leader pull 消息写入本地 log 后 leader 发送 ACKleader 收到所有 ISR 中的 replica 的 ACK 后增加 HWhigh watermark最后 commit 的 offset 并向 producer 发送 ACK
多副本机制
Kafka 为分区Partition引入了多副本Replica机制。分区Partition中的多个副本之间会有一个叫做 leader 的家伙其他副本称为 follower。我们发送的消息会被发送到 leader 副本然后 follower 副本才能从 leader 副本中拉取消息进行同步。
replicas的同步时机
假如有N个replicas其中一个replica为leader其他都为followerleader处理partition的所有读写请求于此同时follower会被动定期的去复制leader上的数据。 好处
Kafka 的多分区Partition以及多副本Replica机制的好处 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力负载均衡。Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力不过也相应的增加了所需要的存储空间。
kafka的优化
吞吐量
因为kafka的数据都是存储在硬盘中甚至有的公司将kafka其作为数据库使用既然数据是基于硬盘的那么为何kafka还是能够拥有如此高的吞吐量呢
1硬盘的索引功能。二分查找法。
分区找到响应的分区 分段根据文件segment的命名可以确认要查找的offset或timestamp在哪个文件中。
稀疏索引快速确定要找的offset在哪个内存地址的附近。 2I/O优化
普通程序I/O需要把Disk中的信息复制到系统环境内存步骤1再复制到kafka应用环境内存步骤2然后步骤3步骤4到Socket通过网络发出重复复制文本I/O消耗大。 kafka的I/O
zookeeper在kafka中的作用
kafka默认在zk中的节点层级结构
参考https://www.jianshu.com/p/a036405f989c
Broker注册
Broker是分布式部署并且相互之间相互独立但是需要有一个注册系统能够将整个集群中的Broker管理起来此时就使用到了Zookeeper。 每个Broker就会将自己的IP地址和端口信息记录到该节点中去。
每个 Broker 在启动时都会到 Zookeeper 上进行注册即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
Topic注册
在 Kafka 中同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区对应到 zookeeper 中会创建这些文件夹/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1
生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上因此生产者需要将消息合理地发送到这些分布式的Broker上那么如何实现生产者的负载均衡Kafka支持传统的四层负载均衡也支持Zookeeper方式实现负载均衡。
上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 PartitionKafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
消费者负载均衡
与生产者类似Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息每个消费者分组包含若干消费者每条消息都只会发送给分组中的一个消费者不同的消费者分组消费自己特定的Topic下面的消息互不干扰。
分区 与 消费者 的关系
在Kafka中规定了每个消息分区 只能被同组的一个消费者进行消费因此需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系
消息 消费进度Offset 记录
定时地将分区消息的消费进度Offset记录到Zookeeper上
消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下 注册到消费者分组。每个消费者服务器启动时都会到Zookeeper的指定节点下创建一个属于自己的消费者节点例如/consumers/[group_id]/ids/[consumer_id]完成节点创建后消费者就会将自己订阅的Topic信息写入该临时节点。 对 消费者分组 中的 消费者 的变化注册监听。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听一旦发现消费者新增或减少就触发消费者的负载均衡。 对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听如果发现Broker服务器列表发生变化那么就根据具体情况来决定是否需要进行消费者负载均衡。 进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程通常对于一个消费者分组如果组内的消费者服务器发生变更或Broker服务器发生变更会发出消费者负载均衡。
kafka对消息的保证
Kafka 如何保证消息的消费顺序
我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序比如我们同时发了 2 个消息这 2 个消息对应的操作分别对应的数据库操作是
更改用户会员等级。根据会员等级计算订单价格。
假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
我们知道 Kafka 中 Partition(分区)是真正保存消息的地方我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中并且我们可以给特定 Topic 指定多个 Partition。 每次添加消息到 Partition(分区) 的时候都会采用尾加法如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量offset。
Kafka 通过偏移量offset来保证消息在分区内的顺序性。
所以我们就有一种很简单的保证消息消费顺序的方法1 个 Topic 只对应一个 Partition。这样当然可以解决问题但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候可以指定 topic, partition, key,data数据 4 个参数。如果你发送消息的时候指定了 Partition 的话所有消息都会被发送到指定的 Partition。并且同一个 key 的消息可以保证只发送到同一个 partition这个我们可以采用表/对象的 id 来作为 key 。
总结一下对于如何保证 Kafka 中消息消费的顺序有了下面两种方法
1 个 Topic 只对应一个 Partition。推荐发送消息的时候指定 key/Partition。
Kafka 如何保证消息不丢失
生产者丢失消息的情况
生产者(Producer) 调用send方法发送消息之后消息可能因为网络问题并没有发送过去。
所以我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作我们可以通过 get()方法获取调用结果但是这样也让它变为了同步操作示例代码如下
SendResultString, Object sendResult kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() ! null) {logger.info(生产者成功发送消息到 sendResult.getProducerRecord().topic() - sendResult.getProducerRecord().value().toString());
}但是一般不推荐这么做可以采用为其添加回调函数的形式示例代码如下
ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o);future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息, result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));如果消息发送失败的话我们检查失败的原因之后重新发送即可另外这里推荐为 Producer 的retries 重试次数设置一个比较合理的值一般是 3 但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后当出现网络问题之后能够自动重试消息发送避免消息丢失。另外建议还要设置重试间隔因为间隔太小的话重试的效果就不明显了网络波动一次你 3 次一下子就重试完了
消费者丢失消息的情况
我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量offset。偏移量offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量offset可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后消费者会自动提交了 offset。自动提交的话会有一个问题试想一下当消费者刚拿到这个消息准备进行真正消费的时候突然挂掉了消息实际上并没有被消费但是 offset 却被自动提交了。
解决办法也比较粗暴我们手动关闭自动提交 offset每次在真正消费完消息之后再自己手动提交 offset 。 但是细心的朋友一定会发现这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后还没提交 offset结果自己挂掉了那么这个消息理论上就会被消费两次。
Kafka 弄丢了消息
暂略
Kafka 如何保证消息不重复消费
kafka 出现消息重复消费的原因
服务端侧已经消费的数据没有成功提交 offset根本原因。Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死触发了分区 rebalance。
解决方案
消费消息服务做幂等校验比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。将 enable.auto.commit 参数设置为 false关闭自动提交开发者在代码中手动提交 offset。那么这里会有个问题什么时候提交 offset 合适 处理完消息再提交依旧有消息重复消费的风险和自动提交一样拉取到消息即提交会有消息丢失的风险。允许消息延时的场景一般会采用这种方式。然后通过定时任务在业务不繁忙比如凌晨的时候做数据兜底。
kafka和rabbitMq的对比 Kafka实战在Spring Boot 程序中使用 Kafka 作为消息队列
1.创建项目
直接通过Spring 官方提供的 Spring Initializr 创建或者直接使用 IDEA 创建皆可。
2.配置kafka
通过 application.yml 配置文件配置 Kafka 基本信息
server:port:9090spring:kafka:consumer:bootstrap-servers:localhost:9092# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset:earliestproducer:bootstrap-servers:localhost:9092# 发送的对象信息变为json格式value-serializer:org.springframework.kafka.support.serializer.JsonSerializer
kafka:topic:my-topic:my-topicmy-topic2:my-topic2Kafka 额外配置类KafkaConfig.java
package cn.javaguide.springbootkafka01sendobjects.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;/*** author shuang.kou*/
Configuration
publicclass KafkaConfig {Value(${kafka.topic.my-topic})String myTopic;Value(${kafka.topic.my-topic2})String myTopic2;/*** JSON消息转换器*/Beanpublic RecordMessageConverter jsonConverter() {returnnew StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic如果 topic 已存在则会忽略。*/Beanpublic NewTopic myTopic() {returnnew NewTopic(myTopic, 2, (short) 1);}Beanpublic NewTopic myTopic2() {returnnew NewTopic(myTopic2, 1, (short) 1);}
}当我们到了这一步之后你就可以试着运行项目了运行成功后你会发现 Spring Boot 会为你创建两个topic:
my-topic: partition 数为 2, replica 数为 1 my-topic2:partition 数为 1, replica 数为 1
命令 kafka-topics --describe --zookeeper zoo1:2181
或者直接通过IDEA 提供的 Kafka 可视化管理插件-Kafkalytic 来查看
3.创建要发送的消息实体类
package cn.javaguide.springbootkafka01sendobjects.entity;publicclass Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id id;this.name name;}省略 getter/setter以及 toString方法
}
4.创建发送消息的生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;Service
publicclass BookProducerService {privatestaticfinal Logger logger LoggerFactory.getLogger(BookProducerService.class);privatefinal KafkaTemplateString, Object kafkaTemplate;public BookProducerService(KafkaTemplateString, Object kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}public void sendMessage(String topic, Object o) {kafkaTemplate.send(topic, o);}
}我们使用Kafka 提供的 KafkaTemplate 调用 send()方法出入要发往的topic和消息内容即可很方便的完成消息的发送: kafkaTemplate.send(topic, o);如果我们想要知道消息发送的结果的话sendMessage方法这样写
public void sendMessage(String topic, Object o) {try {SendResultString, Object sendResult kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() ! null) {logger.info(生产者成功发送消息到 sendResult.getProducerRecord().topic() - sendResult.getProducerRecord().value().toString());}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}但是这种属于同步的发送方式并不推荐没有利用到 Future对象的特性。
KafkaTemplate 调用 send()方法实际上返回的是ListenableFuture 对象。
send()方法源码如下
Overridepublic ListenableFutureSendResultK, V send(String topic, Nullable V data) {ProducerRecordK, V producerRecord new ProducerRecord(topic, data);return doSend(producerRecord);}ListenableFuture 是Spring提供了继承自Future 的接口。
ListenableFuture方法源码如下
publicinterface ListenableFutureT extends FutureT {void addCallback(ListenableFutureCallback? super T var1);void addCallback(SuccessCallback? super T var1, FailureCallback var2);default CompletableFutureT completable() {CompletableFutureT completable new DelegatingCompletableFuture(this);this.addCallback(completable::complete, completable::completeExceptionally);return completable;}
}继续优化sendMessage方法
public void sendMessage(String topic, Object o) {ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o);future.addCallback(new ListenableFutureCallbackSendResultString, Object() {Overridepublic void onSuccess(SendResultString, Object sendResult) {logger.info(生产者成功发送消息到 topic - sendResult.getProducerRecord().value().toString());}Overridepublic void onFailure(Throwable throwable) {logger.error(生产者发送消息{} 失败原因{}, o.toString(), throwable.getMessage());}});}使用lambda表达式再继续优化
public void sendMessage(String topic, Object o) {ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o);future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息, result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));}我们使用send(String topic, Nullable V data)方法的时候实际会new 一个ProducerRecord对象发送
Overridepublic ListenableFutureSendResultK, V send(String topic, Nullable V data) {ProducerRecordK, V producerRecord new ProducerRecord(topic, data);return doSend(producerRecord);}ProducerRecord类中有多个构造方法:
public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V......}如果我们想在发送的时候带上timestamp时间戳、key等信息的话sendMessage()方法可以这样写
public void sendMessage(String topic, Object o) {// 分区编号最好为 null交给 kafka 自己去分配ProducerRecordString, Object producerRecord new ProducerRecord(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o);ListenableFutureSendResultString, Object future kafkaTemplate.send(producerRecord);future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息, result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));}5.创建消费消息的消费者
import cn.javaguide.springbootkafka01sendobjects.entity.Book;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
publicclass BookConsumerService {Value(${kafka.topic.my-topic})private String myTopic;Value(${kafka.topic.my-topic2})private String myTopic2;privatefinal Logger logger LoggerFactory.getLogger(BookProducerService.class);privatefinal ObjectMapper objectMapper new ObjectMapper();KafkaListener(topics {${kafka.topic.my-topic}}, groupId group1)public void consumeMessage(ConsumerRecordString, String bookConsumerRecord) {try {Book book objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info(消费者消费topic:{} partition:{}的消息 - {}, bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}KafkaListener(topics {${kafka.topic.my-topic2}}, groupId group2)public void consumeMessage2(Book book) {logger.info(消费者消费{}的消息 - {}, myTopic2, book.toString());}
}
6.创建一个 Rest Controller
import cn.javaguide.springbootkafka01sendobjects.entity.Book;
import cn.javaguide.springbootkafka01sendobjects.service.BookProducerService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.atomic.AtomicLong;/*** author shuang.kou*/
RestController
RequestMapping(value /book)
publicclass BookController {Value(${kafka.topic.my-topic})String myTopic;Value(${kafka.topic.my-topic2})String myTopic2;privatefinal BookProducerService producer;private AtomicLong atomicLong new AtomicLong();BookController(BookProducerService producer) {this.producer producer;}PostMappingpublic void sendMessageToKafkaTopic(RequestParam(name) String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));}
}7.测试
输入命令
curl -X POST -F nameJava http://localhost:9090/bookmy-topic 有2个partition分区 当你尝试发送多条消息的时候你会发现消息会被比较均匀地发送到每个 partion 中