网站设计心得体会,Wordpress带商城的主题,wordpress自动轮播图插件,一个企业的网站建设前言#xff1a;
在分布式消息系统中#xff0c;消息的顺序性是一个重要的问题#xff0c;也是一个常见的业务场景#xff0c;那 Kafka 作为一个高性能的分布式消息中间件#xff0c;又是如何实现顺序消息的呢#xff1f;本篇我们将对 Kafka 的顺序消息展开讨论。
Kafk…前言
在分布式消息系统中消息的顺序性是一个重要的问题也是一个常见的业务场景那 Kafka 作为一个高性能的分布式消息中间件又是如何实现顺序消息的呢本篇我们将对 Kafka 的顺序消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
顺序消息的使用场景
顺序消息的使用场景众多这里我简单列举几个如下
即时消息中的单对单聊天和群聊保证发送方消息发送顺序与接收方的顺序一致。电商中下单后订单创建、支付、订单发货和物流更新的顺序性。手机充值过程中的扣款短信和重置成功的短信应该有顺序性。。。。。等等等场景。
Kafka 如何保证消息的顺序性
讨论 Kafka 消息的顺序性需要分单分区和多分区来讨论具体如下
单分区单分区的消息顺序性相对简单因为消息在单分区中是相对有序的只需要保证消息发送顺序和消费顺序即可。多分区多分区要保证消息有序就需要额外的设计来保证消息全局有序了。
根据上面的简单分析我们知道 Kafka 单分区的消息有序相对简单接下来我们分析一下 Kafka 如何保证单分区消息有序。
Kafka 如何保证单分区消息有序
Kafka 保证单分区消息有序需要从两个方面来讲一个是消息生产者一个是消息消费者具体如下
消息生产者
使用相同的分区键Partition Key生产者发送消息时指定相同的分区键使得所有消息都发送到同一个分区。指定消息 key如果没有指定分区我们指定一个相同的消息 KeyKafka 会根据 Key 进行 Hash 计算出一个分区号如果消息的 Key 相同那么也会计算一个相同的分区号消息也会发送到同一个分区了。自定义分区器如果想要实现更复杂的分区逻辑可以实现自定义分区器来达到消息最终到达同一个分区。
消息消费者
生产这已经保证了消费的发送有序因此消息消费者使用单线程消费即可。
Kafka 顺序消息实现案例
上面我们对 Kafka 顺序消息的实现做了基本分析下面我们就使用代码来实现 Kafka 的顺序消息。
Kafka 顺序消息 Producer
在 Producer 中分别实现了两种顺序消息的方式分别是指定分区和指定 Key具体代码如下
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.ExecutionException;/*** ClassName MyKafkaOrderlyProducer* Author: Author* Date: 2024/10/22 19:22* Description: 顺序消息发送者*/
Slf4j
Component
public class MyKafkaOrderlyProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate;//指定分区public void sendOrderlyByPartitionMessage() {try {this.kafkaTemplate.send(my-topic, 1, null, Partition--订单666创建).get();this.kafkaTemplate.send(my-topic, 1, null, Partition--订单666支付).get();this.kafkaTemplate.send(my-topic, 1, null, Partition--订单666发货).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}//指定 keypublic void sendOrderlyByKeyMessage() {try {this.kafkaTemplate.send(my-topic, 666, Key--订单666创建).get();this.kafkaTemplate.send(my-topic, 666, Key--订单666支付).get();this.kafkaTemplate.send(my-topic, 666, Key--订单666发货).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}在 Producer 代码中我们使用了 Kafka 的同步发送消息。
Kafka 顺序消息 Consumer
顺序消息的消费者代码十分简单还是使用 KafkaListener 完成消息消费注意是单线程消费即可。
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** ClassName MyKafkaConsumer* Author: zhangyong* Date: 2024/10/22 19:22* Description: MyKafkaOrderlyConsumer*/
Slf4j
Component
public class MyKafkaOrderlyConsumer {KafkaListener(id my-kafka-order-consumer,groupId my-kafka-consumer-groupId,topics my-topic,containerFactory myContainerFactory)public void listen(String message) {log.info(消息消费成功消息内容:{}, message);}}Kafka 顺序消息发送消费验证
验证指定分区情况下的顺序消息
2024-10-28 20:55:18.495 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704 INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Partition--订单666发货消息是按照发送顺序来消费的结果符合预期。
验证指定 Key 情况下的顺序消息
2024-10-28 20:56:13.238 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443 INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666发货消息是按照发送顺序来消费的结果符合预期。
Kafka 自定义分区器
自定义分区器就是按自己的规则来指定消息最终要发送的分区可以根据自己的需求灵活实现案例代码中先获取分区数量然后使用的是 key 的 Hash 值进行 Hash 取模的方式获取分区具体代码如下
package com.order.service.kafka;import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;/*** ClassName CustomPartitioner* Author: Author* Date: 2024/10/28 20:57* Description:*/
public class CustomPartitioner implements Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取 分区数量ListPartitionInfo partitionInfos cluster.partitionsForTopic(topic);if (key null || keyBytes null !(key instanceof String)) {throw new BusinessException(key 不能为空且需要是字符串类型);}String keyStr key.toString();int partition keyStr.hashCode() % partitionInfos.size();return partition;}Overridepublic void close() {}Overridepublic void configure(MapString, ? map) {}
}配置自定义分区器
自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器关键配置如下
//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);完整的配置 KafkaProducerConfig 配置如下
package com.order.service.config;import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** author author* description* modified By* version: V1.0*/
Configuration
EnableKafka
public class KafkaProducerConfig {Value(${spring.kafka.bootstrap-servers})private String servers;Value(${spring.kafka.producer.batch-size})private String batchSize;Value(${spring.kafka.producer.buffer-memory})private String bufferMemory;Value(${spring.kafka.producer.properties.linger.ms})private String lingerMs;Bean(myProducerKafkaProps)public MapString, Object getMyKafkaProps() {MapString, Object props new HashMap(10);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送消息的大小 默认 16KBprops.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);//生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32Mprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);//批量发送的的最大时间间隔单位是毫秒props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);//自定义分区器配置props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}Beanpublic ProducerFactoryString, String newProducerFactory() {return new DefaultKafkaProducerFactory(getMyKafkaProps());}Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(newProducerFactory());}}
自定义分区 Consumer 代码案例
自定义分区 Consumer 代码没有什么特殊之处指定一个 key 即可key 一致就可以保证消息发送到同一个 Partition 中保证消息的顺序具体代码如下
//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {try {this.kafkaTemplate.send(my-topic, 666, Key--订单666创建).get();this.kafkaTemplate.send(my-topic, 666, Key--订单666支付).get();this.kafkaTemplate.send(my-topic, 666, Key--订单666发货).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}自定义分区顺序消息验证
触发消息发送后 debugger 如下 控制台记录消费日志如下
2024-10-30 17:24:52.716 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921 INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer : 消息消费成功消息内容:Key--订单666发货消息是按顺序消费的结果符合预期。
总结Kafka 只能在单个 Partition 中保持消息的顺序存储要想保证消息的顺序性就必须让需要保持顺序的消息发送到同一个 Partition对于消费端消费消息的顺序性只需要保证使用单线程进行消费即可一般来说比较少用到 Kafka 的顺序消息这里分享一下还是希望可以帮助到有需要的朋友。
如有不正确的地方欢迎各位指出纠正。