电子商务网站建设林锋课本答案,网店代运营费用,帮一个公司做网站多少钱,服装定制行业的未来和趋势前言#xff1a;
前面我们分享了 Kafka 的一些基础知识#xff0c;以及 Spring Boot 集成 Kafka 完成消息发送消费#xff0c;本篇我们来分享一下 Kafka 的批量消息发送消费。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka Kafka…前言
前面我们分享了 Kafka 的一些基础知识以及 Spring Boot 集成 Kafka 完成消息发送消费本篇我们来分享一下 Kafka 的批量消息发送消费。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 消息批量发送
Kafka 没有提供批量发送消息的 APIKafka 的方式是提供一个 RecordAccumulator 消息收集器将发送给同一个 Topic 同一个 Partition 的消息先缓存起来当其达到某些条件后才会一次性的将消息提交给 Kafka Broker。
Kafka 消息的批量发送主要跟以下三个参数有关
batch.size批量发送消息的大小默认 16KB产生的消息达到这个数量后即刻触发消息批量提交到 Kafka Broker。buffer.memory生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数模式是 32 MB如果超过这个数量即刻触发消息批量提交到 Kafka Broker。linger.ms批量发送的的最大时间间隔单位是毫秒当达到配置的时间之后会立刻触发消息批量提交大 Kafka Broker。
以上三个条件满足一个就会触发消息的批量提交。
官方文档传送门
Kafka 批量消息 参数配置
上面我们分析了 Kafka 没有提供批量发送的 API而是使用了三个参数来控制批量发送的换句话说其实我们每次使用 Kafka 发送消息的时候都是批量发送Kafka 批量发送消息的代码没有什么特殊之处只需要对上面解释的三个参数进行按需配置即可本案例的配置如下
#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32M
spring.kafka.producer.buffer-memory 33554432
#批量发送的的最大时间间隔单位是毫秒
spring.kafka.producer.properties.linger.ms50000Kafka 批量消息 Producer 代码演示
Kafka 批量发送消息代码如下
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** ClassName MyKafkaBatchProducer* Author: Author* Date: 2024/10/22 19:22* Description:*/
Slf4j
Component
public class MyKafkaBatchProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String dateStr sdf.format(new Date());log.info(开始消息发送,当前时间:{}, dateStr);for (int a 0; a 1000; a) {this.kafkaTemplate.send(my-topic, 第 a 条 kafka 消息);}log.info(完成消息发送,当前时间:{}, dateStr);}}在 Kafka 发送完成消息后我们记录了当前时间这个时间是用来证明消息是被批量发送的。
Kafka 批量消息 Consumer 代码演示
Kafka 批量消息的代码也没有什么特殊之处还是使用 KafkaListener注解来监听消息只不过参数变成了 ListConsumerRecordString, String 类型然后我们在配置中配置了批量消费的模式批量消费的配置如下
#Kafka 的消费模式 single 每次单条消费消息 batch 每次批量消费消息
spring.kafka.listener.type batchConsumer 代码如下
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** ClassName MyKafkaBatchConsumer * Author: Author* Date: 2024/10/22 19:22* Description:*/
Slf4j
Component
public class MyKafkaBatchConsumer {KafkaListener(id my-kafka-consumer-01,groupId my-kafka-consumer-groupId-01,topics my-topic,containerFactory myContainerFactory,properties {max.poll.records:10})public void listen(ListConsumerRecordString, String consumerRecords) {SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String dateStr sdf.format(new Date());log.info(my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}, dateStr, consumerRecords.size());for (ConsumerRecordString, String consumerRecord : consumerRecords) {String value consumerRecord.value();log.info(消息内容:{},value);}}}这里我们使用了 properties 这个属性配置后面详细讲解。
** Kafka 批量消息验证**
触发消息发送消费结果如下
2024-10-27 15:27:17.563 INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570 INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : 消息内容:第9条 kafka 消息
2024-10-27 15:27:17 完成消息发送:2024-10-27 15:27:22 完成消息消费时间间隔是 5秒消息是 10 条符合预期。
我们修改配置再次演示将批量发送消息的时间间隔改为 10 秒同时一次性发送 1000 条消息是消息的总大小大于 1KB。
#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32M
spring.kafka.producer.buffer-memory 33554432
#批量发送的的最大时间间隔单位是毫秒
spring.kafka.producer.properties.linger.ms50000调整消息发送端代码如下
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** ClassName KafkaProducer* Author: Author* Date: 2024/10/22 19:22* Description:*/
Slf4j
Component
public class MyKafkaBatchProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String dateStr sdf.format(new Date());log.info(开始消息发送,当前时间:{}, dateStr);for (int a 0; a 1000; a) {this.kafkaTemplate.send(my-topic, 第 a 条 kafka 消息);}log.info(完成消息发送,当前时间:{}, dateStr);}}触发消息发送消费结果如下
2024-10-27 15:41:39.530 INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530 INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10可以看到消息发送和消息消费几乎是同时进行的因为这里我们打印的是时间只有秒是看不出差异的但是也可以根据这个结果看出消费者并没有等到 10秒后才开始消费是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker结果符合预期。
关于 buffer-memory 这个参数这里不做验证了有兴趣的朋友可以自己去验证哈。
spring.kafka.consumer.max-poll-records 参数讨论
spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数默认为 500 条上面的案例中我们使用了 properties {“max.poll.records:10”} 这个配置其实这个配置也是配置批量拉去消息的最大数量我们配置的是 10日志记录每次最多拉去的数量就是 10使用 properties 的配置方式可以覆盖掉项目配置文件中的配置也就是局部配置覆盖全局配置这样做的好处是显而易见的我们可以针对每个消费端按需做出灵活配置。
总结本篇简单分享了 Kafka 批量发送消息消费的一些案例希望可以帮助到有需要的朋友分享有错误的地方也欢迎大家提出纠正。
如有不正确的地方欢迎各位指出纠正。