当前位置: 首页 > news >正文

凡科建站弊端微网站栏目

凡科建站弊端,微网站栏目,一般通过什么来进行知识点挖掘,网络营销相关的岗位有哪些Kafka#xff1a;分布式消息系统的核心原理与安装部署-CSDN博客 自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客 Kafka 生产者全面解析#xff1a;从基础原理到高级实践-CSDN博客 Kafka 生产者优化与数据处理经验-CSDN博客 Kafka 工作流程解析#xff1a… Kafka分布式消息系统的核心原理与安装部署-CSDN博客 自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客 Kafka 生产者全面解析从基础原理到高级实践-CSDN博客 Kafka 生产者优化与数据处理经验-CSDN博客 Kafka 工作流程解析从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客 Kafka 消费者全面解析原理、消费者 API 与Offset 位移-CSDN博客 Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客 Kafka 数据倾斜原因、影响与解决方案-CSDN博客 Kafka 核心要点解析_kafka mirrok-CSDN博客 Kafka 核心问题深度解析全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客 目录 一、提高生产者吞吐量 一相关参数设置 二代码示例与测试 二、数据可靠性 一ACK 机制分析 二代码示例与可靠性总结 三、数据去重 一数据传递语义 二幂等性原理与使用 三生产者事务 四、数据有序 五、数据乱序 六、总结 在大数据处理领域Kafka 作为一款高性能的分布式消息队列系统被广泛应用于数据的传输、存储与处理。对于生产者而言如何高效地将数据发送到 Kafka 集群同时保证数据的可靠性、去重、有序性等是至关重要的问题。本文将深入探讨 Kafka 生产者在提高吞吐量、保证数据可靠性、去重、有序性等方面的生产经验并结合代码示例进行详细分析。 一、提高生产者吞吐量 一相关参数设置 batch.size批次大小默认 16k。适当增大批次大小可以减少网络请求次数提高吞吐量。例如将其设置为 18000可以在一次网络请求中发送更多的数据。 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);linger.ms等待时间修改为 5 - 100ms。该参数控制消息在缓冲区的等待时间适当增加等待时间可以让更多的消息进入同一批次。比如设置为 1ms会使消息更快地被发送减少等待时间带来的延迟但可能会导致批次较小。 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);compression.type压缩 snappy。启用压缩可以减少网络传输的数据量提高传输效率。Snappy 是一种高效的压缩算法能够在不显著增加 CPU 开销的情况下大幅降低数据大小。 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);RecordAccumulator缓冲区大小修改为 64m。增大缓冲区大小可以容纳更多的消息避免因缓冲区满而频繁触发发送操作提高整体性能。 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);二代码示例与测试 以下是一个简单的 Kafka 生产者代码示例展示了如何设置上述参数 package com.bigdata.producter;import org.apache.kafka.clients.producer.*;import java.util.Properties;/*** 测试自定义分区器的使用*/ public class CustomProducer07 {public static void main(String[] args) {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);/*** 此处是提高效率的代码*/// batch.size批次大小默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);// linger.ms等待时间默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator缓冲区大小默认 32Mbuffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type压缩默认 none可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,snappy);// 创建了一个消息生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 调用这个里面的send方法// ctrl p/*ProducerRecordString, String producerRecord new ProducerRecordString, String(first,告诉你个秘密);kafkaProducer.send(producerRecord);*/for (int i 0; i 5; i) {// 发送消息的时候指定key值但是没有分区号会根据 hash(key) % 3 [0,1,2]ProducerRecordString, String producerRecord new ProducerRecordString, String(first,c,告诉你个找bigdata的好办法:i);// 回调-- 调用之前先商量好回扣多少。kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 获取很多信息exception null 说明成功不为null,说明失败if(exception null){System.out.println(消息发送成功);System.out.println(metadata.partition());// 三个分区我什么每次都是2分区粘性分区System.out.println(metadata.offset());// 13 14 15 16 17System.out.println(metadata.topic());}else{System.out.println(消息发送失败失败原因exception.getMessage());}}});}kafkaProducer.close();} } 测试时可以在 hadoop102 上开启 Kafka 消费者 bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic first然后在 IDEA 中执行上述代码观察 hadoop102 控制台中是否接收到消息。 二、数据可靠性 一ACK 机制分析 acks 0生产者发送过来的数据不需要等数据落盘应答。这种方式效率最高但可靠性最差。例如发送了 Hello 和 World 两个信息若 Leader 直接挂掉数据就会丢失因为生产者不会等待任何应答数据一发送就认为成功无法保证数据真正被 Kafka 集群接收和持久化。acks 1生产者发送过来的数据Leader 收到数据后应答。此时如果 Leader 保存成功并应答但在 Follower 未同步数据时 Leader 挂掉且该 Follower 成为新的 Leader那么之前的数据就会丢失可靠性中等效率中等。acks -1all生产者发送过来的数据Leader 和 ISR 队列里面的所有节点收齐数据后应答。这提供了最高的可靠性但效率相对较低。不过如果分区副本设置为 1 个只有一个 leader或者 ISR 里应答的最小副本数量min.insync.replicas 默认为 1设置为 1和 ack 1 的效果是一样的仍然有丢数的风险。数据完全可靠条件是 ACK 级别设置为 -1 分区副本大于等于 2 ISR 里应答的最小副本数量大于等于 2。 Leader收到数据所有Follower都开始同步数据 但有一个Follower因为某种故障迟迟不能与Leader进行同步那这个问题怎么解决呢 解决方案 Leader维护了一个动态的in-sync replica setISR意为和 Leader保持同步的FollowerLeader集合(leader0isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定默认30s。例如2超时(leader:0, isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点。 ISR: 可用的存活的LeaderFollower 数据可靠性分析 如果分区副本设置为1个只有一个leader或者ISR里应答的最小副本数量 min.insync.replicas 默认为1设置为1和ack1的效果是一样的仍然有丢数的风险leader0isr:0。 • 数据完全可靠条件 ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2 副本数是2但是ISR中不一定有两个因为会挂掉。 可靠性总结 acks0生产者发送过来数据就不管了可靠性差效率高 acks1生产者发送过来数据Leader应答可靠性中等效率中等 acks-1生产者发送过来数据Leader和ISR队列里面所有Follwer应答可靠性高效率低 在生产环境中acks0很少使用acks1一般用于传输普通日志允许丢个别数据 acks-1一般用于传输和钱相关的数据对可靠性要求比较高的场景。 开发环境你的本地 测试环境你们公司搭建的测试平台跟生产环境基本相似 正式环境生产环境公司正在使用的对外提供服务的 数据重复分析 acks -1all生产者发送过来的数据Leader和ISR队列里面的所有节点收齐数据后应答 记录acks -1 有可能出现数据重复问题 数据发送给了LeaderFollower 也同步成功了此时准备应答为-1的时候Leader挂了Follower顶上由于发送者不知道数据已经发送成功会给新的Leader再发消息此时数据重复。 二代码示例与可靠性总结 以下是设置 ACK 机制的代码示例 package com.bigdata.kafka.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerACKs {public static void main(String[] args) {// 这个里面放置配置相关信息Properties properties new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop11:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 设置应答机制是哪个级别的默认是all 等同于 -1properties.put(ProducerConfig.ACKS_CONFIG,all);// 重试次数 retries默认是 int 最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);for (int i0;i5;i) {// 回调函数 本次发送并没有指定分区和Key值仅仅发送的是value// 本地发送到底发给哪个分区呢 1随机 2黏住它 使用的是粘性分区kafkaProducer.send(new ProducerRecord(first, nihao: i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception null ){// 通过回调函数可以获取本次发送的内容和分区int partition metadata.partition();String topic metadata.topic();System.out.println(本次发送的主题是topic,发给了哪个分区 partition);}else{exception.printStackTrace();}}});}// 此处如果不close() 发送不了数据kafkaProducer.close();} } 在生产环境中acks 0 很少使用acks 1一般用于传输普通日志允许丢个别数据acks -1一般用于传输和钱相关的数据等对可靠性要求比较高的场景。 三、数据去重 一数据传递语义 至少一次At Least Once等于 ACK 级别设置为 -1 分区副本大于等于 2 ISR 里应答的最小副本数量大于等于 2可以保障数据可靠但不能保证数据不重复。最多一次At Most Once等于 ACK 级别设置为 0能保证数据不重复但不能保证数据不丢失。精确一次Exactly Once对于一些非常重要的信息如和钱相关的数据要求数据既不能重复也不丢失。Kafka 0.11 版本以后引入了幂等性和事务来保障数据精确一次。 二幂等性原理与使用 幂等性原理幂等性就是指 Producer 不论向 Broker 发送多少次重复数据Broker 端都只会持久化一条保证了不重复。其判断标准是具有 PID, Partition, SeqNumber 相同主键的消息提交时Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的Partition 表示分区号Sequence Number 是单调自增的。所以幂等性只能保证在单分区单会话重启会话就是下一次了内不重复。如果 kafka 集群挂了重启后以前的数据可能会再次发送导致数据重复。如何使用幂等性开启参数 enable.idempotence 默认为 truefalse 关闭。 三生产者事务 Kafka 事务原理 每一个 broker 都有一个事务协调器有特定算法确定本次事务对应的事务协调器。 Kafka 的事务 API 包括 initTransactions初始化事务、beginTransaction开启事务、sendOffsetsToTransaction在事务内提交已经消费的偏移量主要用于消费者、commitTransaction提交事务、abortTransaction放弃事务类似于回滚事务的操作。 // 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量主要用于消费者 void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets,String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务类似于回滚事务的操作 void abortTransaction() throws ProducerFencedException; 单个 Producer 使用事务保证消息的仅一次发送代码示例 package com.bigdata.producter;import org.apache.kafka.clients.producer.*;import java.util.Properties;/*** 使用事务幂等性保证数据唯一*/ public class CustomProducer09 {public static void main(String[] args) {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 设置应答机制是哪个级别的默认是all 等同于 -1properties.put(ProducerConfig.ACKS_CONFIG,all);// 重试次数 retries默认是 int 最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 默认幂等性是开启的所以不用设置// 必须设置事务的ID// 设置事务 id必须事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction_id_0);// 创建了一个消息生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 初始化事务kafkaProducer.initTransactions();// 调用这个里面的send方法// ctrl p/*ProducerRecordString, String producerRecord new ProducerRecordString, String(first,告诉你个秘密);kafkaProducer.send(producerRecord);*/// 开启事务kafkaProducer.beginTransaction();try {for (int i 0; i 5; i) {// 发送消息的时候指定key值但是没有分区号会根据 hash(key) % 3 [0,1,2]ProducerRecordString, String producerRecord new ProducerRecordString, String(first, c, 告诉你个找bigdata的好办法: i);// 回调-- 调用之前先商量好回扣多少。kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 获取很多信息exception null 说明成功不为null,说明失败if (exception null) {System.out.println(消息发送成功);System.out.println(metadata.partition());// 三个分区我什么每次都是2分区粘性分区System.out.println(metadata.offset());// 13 14 15 16 17System.out.println(metadata.topic());} else {System.out.println(消息发送失败失败原因 exception.getMessage());}}});if(i3){int a 10 /0 ;}}// 提交事务kafkaProducer.commitTransaction();}catch (Exception e){// 如果出错了回滚事务kafkaProducer.abortTransaction();}finally {kafkaProducer.close();}} }四、数据有序 生产者发送的数据单分区内可以做到有序多分区则无法保证除非把多个分区的数据拉到消费者端进行排序但这样做效率很低还不如直接设置一个分区。 五、数据乱序 kafka在1.x版本之前保证数据单分区有序条件如下 max.in.flight.requests.per.connection1不需要考虑是否开启幂等性。 kafka在1.x及以后版本保证数据单分区有序条件如下 开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5。 未开启幂等性 max.in.flight.requests.per.connection需要设置为1。 原因说明因为在kafka1.x以后启用幂等后kafka服务端会缓存producer发来的最近5个request的元数据 故无论如何都可以保证最近5个request的数据都是有序的。 六、总结 在 Kafka 生产者的实际应用中需要根据不同的业务场景和需求合理设置各种参数以平衡数据的吞吐量、可靠性、去重、有序性等方面的要求。例如对于普通日志数据可以适当牺牲一些可靠性采用 acks 1 的设置提高吞吐量而对于金融交易等对数据准确性要求极高的场景则需要开启幂等性和事务确保数据的精确一次处理。同时要深入理解各个参数的含义和相互关系以及不同版本 Kafka 的特性差异才能更好地优化生产者的性能构建高效稳定的大数据处理管道。
http://www.w-s-a.com/news/974974/

相关文章:

  • 可以登录国外网站吗如何用家用电脑做网站
  • 吉安建站公司wordpress企业
  • 河北住房和城乡建设厅网站6thinkphp做视频网站
  • 遵义网站制作一般需要多少钱深圳全国网站制作哪个好
  • 公众平台网站价格哪个网站做餐饮推广最好
  • 深圳 公司网站设计重庆的网站设计公司价格
  • 网站开发市场分析餐饮平台app有哪些
  • 制作一个收费网站要多少钱开发网站需要什么技术
  • 网站流量统计平台二手域名做网站不收录
  • 蒙古网站后缀mysql8.0 wordpress
  • 免费建立一个网站互联网推广培训
  • WordPress多站点绑定域名深圳住房建设部官方网站
  • 网站建设公司zgkr上海网页网络技术有限公司
  • wordpress附件扩展格式徐州seo关键词
  • wordpress博客站模板织梦网站 联系方式修改
  • 北京城乡建设厅网站重庆网站建设解决方案
  • 网站建设和维护工作内容网站的空间与域名
  • 济南做门户网站开发公司网页发布的步骤
  • 江苏省交通厅门户网站建设管理办法做的网站怎么让百度收录
  • 关于怎么做网站网站site的收录数量要多远索引量
  • 传世网站建设阳光创信-网站建设首选品牌
  • 周口建设网站中国装修公司十大排名
  • wordpress自助发卡青浦网站优化
  • 南京建设银行公积金查询网站wordpress加载插件下载
  • 做网站怎么那么难网站的建设与管理的心得体会
  • 黄冈网站建设哪家快些网站规划与建设评分标准
  • 建站平台 绑定域名怎么在手机上做网站
  • 做电影网站违法吗莱芜 网站
  • 品牌咨询公司泉州seo不到首页不扣费
  • 做网站做一个什么主题的怎样搭建一个企业网站