当前位置: 首页 > news >正文

北京好的建站团队创建公司主页

北京好的建站团队,创建公司主页,福州建网站 做网页,网站免费认证kafka概述 一、kafka概述 1.1 定义1.2 消息队列 1.2.1 传统消息队列的应用场景1.2.2 消息队列的两种形式1.3 Kafka 基础架构二、kafka安装部署 2.1安装部署 2.1.1.jar包下载2.1.2.解压到指定的文件夹下2.1.3.创建两个文件夹以供后续使用2.1.4. 修改配置文件 #xff08;11修改zookeeper.properties 文件2 修改server.properties 文件2.2启动 2.2.1.启动 kafka 内置的 zookeeper2.2.2.启动 kafka 服务2.2.3.创建一个名为 test1 的 topic 测试主题 kafka2.2.4.创建消息生产者生产消息2.2.5.创建消息消费者接收消息2.2.6.测试消息发送和接收三、kafka架构深入理解 3.1 Kafka 工作流程 3.1.1 写入方式3.1.2 分区Partition 1.启动zookeeper2 启动服务3 创建主题4 查看主题5 查看主题列表3.2 Kafka 存储机制 3.2.1 数据分片3.2.2 log分段3.2.3 日志的清除策略以及压缩策略3.3 Kafka 生产者 3.3.1 数据可靠性保证3.4 Kafka 消费者 3.4.1概念3.4.2 消费方式 1.消费位移确认2 以时间戳查询消息3 消费速度控制3.5 Kafka 高效读取数据四、kafka API 4.1 Producer API 4.1.1 消息发送流程4.1.2 异步发送 API4.1.3 同步发送 API4.2 Consumer API 4.2.1 自动提交 offset4.2.2 手动提交 offset4.2.3 自定义存储 offset4.3 自定义拦截器 4.3.1 拦截器原理4.3.2 拦截器案例五、kafka事务 5.1 事务场景5.2 几个关键概念和推导5.3 事务语义 5.3.1 多分区原子写入5.3.2 粉碎“僵尸实例”5.3.3 读事务消息5.4.4 事务处理Java API 5.4.4.1 api分类5.4.4.2 事务配置5.4.4.3 “只有写”应用程序示例5.4.4.4 消费-生产并存consume-Transform-Produce5.5 事务工作原理 5.5.1 事务协调器和事务日志5.5.2 事务数据流5.6 事务相关配置 5.6.1 Broker configs5.6.2 Producer configs5.6.3 Consumer configs5.7 事务性能以及如何优化 5.7.1 Producer打开事务之后的性能5.7.2 Consumer打开之后的性能六、SpringBoot集成kafka 6.1 配置Maven依赖6.2 项目具体代码 6.2.1 yml配置6.2.2 生产者6.2.3 消费者6.2.4 一、kafka概述 1.1 定义 Kafka 是一个分布式的基于发布 / 订阅模式的消息队列Message Queue主要应用于大数据实时处理领域。 1.2 消息队列 1.2.1 传统消息队列的应用场景 使用消息队列的好处 解耦 允许独立的扩展或修改两边的处理过程只要确保它们遵守同样的接口约束。 可恢复性 系统的一部分组件失效时不会影响整个系统。消息队列降低了进程间的耦合度所以即使一个处理消息的进程挂掉加入队列中的消息仍然可以在系统恢复后被处理。 缓冲 有助于控制和优化数据流经过系统的速度解决生产消息和消费消息的处理速度不一致的情况。 灵活性和峰值处理能力 使用消息队列能够使关键组件顶住突发的访问压力而不会因为突发的超负荷的请求而完全崩溃。 异步通信 很多时候用户不想也不需要立即处理消息。消息队列提供了异步处理机制允许用户把一个消息放入队列但并不立即处理它。想向队列中放入多少消息就放多少然后在需要的时候再去处理它们。 1.2.2 消息队列的两种形式 点对点模式一对一消费者主动拉取数据消息收到后消息清除。 消息生产者生产消息发送到 Queue 中然后消费者从 Queue 中取出并且消费消息。消息被消费以后Queue 中不再有存储所以消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者但对于一个消息而言只有一个消费者可以消费。 发布 / 订阅模式一对多消费者消费数据之后不会清除消息 消息生产者发布将消息发布到 topic 中同时有多个消息消费者订阅消费该消息。和点对点方式不同发布到 topic 中的消息会被所有订阅者消费。 1.3 Kafka 基础架构 Producer 消息生产者就是向 Kafka broker 发消息的客户端。 Consumer 消息消费者向 Kafka broker 取消息的客户端。 Consumer GroupCG 消费者组由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 Broker 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。 Topic 可以理解为一个队列生产者和消费者面向的都是一个 topic。 Partiton 为了实现拓展性一个非常大的 topic 可以分布到多个 broker即服务器上一个 topic 可以分为多个 Partition每个 partition 都是一个有序的队列。 Replication 副本为保证集群中某个节点发生故障时该节点上的 partition 数据不丢失且 Kafka 仍然可以继续工作Kafka 提供了副本机制一个 topic 的每个分区都有若干个副本一个 leader 和若干个 follower。 leader 每个分区多个副本的 ” 主 “生产者发送数据的对象以及消费者消费数据时的对象都是 leader。 follower 每个分区多个副本的 “从”实时从 leader 中同步数据保持和 leader 数据的同步。leader 发生故障时某个 follower 会成为新的 leader。 二、kafka安装部署 2.1安装部署 2.1.1.jar包下载 网址http://kafka.apache/downloads.html !不要下载最新版在win10下有问题下载2.8.1如下图 下载Binary版本 [这里是图片005] 2.1.2.解压到指定的文件夹下 2.1.3.创建两个文件夹以供后续使用 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aiBYooS2-1644458525646)(https://images1.tqwba/20201029/5trqdowih4q.png)] 2.1.4. 修改配置文件 1修改zookeeper.properties 文件 修改 kafka_2.12-2.8.1configzookeeper.properties 文件 大概第16行 注意文件分隔符是\ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WL7WD6Hz-1644458525651)(https://images1.tqwba/20201029/twfv43l23jb.png)] 2 修改server.properties 文件 修改 kafka_2.12-2.8.1configserver.properties 文件 大概第60行 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ql4BDbuW-1644458525655)(https://images1.tqwba/20201029/xlzkbjusutq.png)] 2.2启动 2.2.1.启动 kafka 内置的 zookeeper 运行 cmd 命令 如果报错 The input line is too long将文件路径缩小即可如直接放在C盘下 如果报错Unable to access datadir请把修改配置文件时的两个路径均修改为相对路径 .inwindowszookeeper-server-start.bat .configzookeeper.properties[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rOwCHlMa-1644458525657)(https://images1.tqwba/20201029/iksbqbox45t.png)] 不关闭当前窗口 2.2.2.启动 kafka 服务 运行 cmd 命令 .inwindowskafka-server-start.bat .configserver.properties不关闭当前窗口 2.2.3.创建一个名为 test1 的 topic 测试主题 kafka 运行 cmd 命令 .inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1该窗口可关闭 2.2.4.创建消息生产者生产消息 运行 cmd 命令 .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test1[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0kvDgrbJ-1644458525662)(https://images1.tqwba/20201029/vnifjx1ckrw.png)] 不关闭当前窗口 2.2.5.创建消息消费者接收消息 运行 cmd 命令 .inwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lyzsgoOg-1644458525663)(https://images1.tqwba/20201029/tjyef1mgnsh.png)] 不关闭当前窗口 2.2.6.测试消息发送和接收 测试成功 三、kafka架构深入理解 3.1 Kafka 工作流程 3.1.1 写入方式 producer采用推push模式将消息发布到broker每条消息都被追加append到分区patition中属于顺序写磁盘顺序写磁盘效率比随机写内存要高保障kafka吞吐率。 3.1.2 分区Partition Kafka集群有多个消息代理服务器broker-server组成发布到Kafka集群的每条消息都有一个类别用主题topic来表示。通常不同应用产生不同类型的数据可以设置不同的主题。一个主题一般会有多个消息的订阅者当生产者发布消息到某个主题时订阅了这个主题的消费者都可以接收到生成者写入的新消息。 afka集群为每个主题维护了分布式的分区partition日志文件物理意义上可以把主题topic看作进行了分区的日志文件partition log。主题的每个分区都是一个有序的、不可变的记录序列新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号叫做偏移量offset这个偏移量能够唯一地定位当前分区中的每一条消息。 消息发送时都被发送到一个topic其本质就是一个目录而topic是由一些Partition Logs(分区日志)组成其组织结构如下图所示 下图中的topic有3个分区每个分区的偏移量都从0开始不同分区之间的偏移量都是独立的不会相互影响。 我们可以看到每个Partition中的消息都是有序的生产的消息被不断追加到Partition log上其中的每一个消息都被赋予了一个唯一的offset值。发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置这种情况下消息会均衡地分布到不同的分区。 演示 1.启动zookeeper .inwindowszookeeper-server-start.bat .configzookeeper.properties2 启动服务 .inwindowskafka-server-start.bat .configserver.properties3 创建主题 创建主题 .inwindowskafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first 这里主要解释一下–replication-factor 1 和 --partitions 1的含义 –replication-factor 1表示的意思是给主题first的副本数为1 –partition 1的意思是将主题first分为1个分区在实际运用中我们可以选择多个分区分区的好处是为了避免给kafka集群中的节点服务器造成过大的压力比如说没有分区的时候一个主题位于一个服务器上面如果该主题中的消息数量过大的话那么会增加服务器的压力通过分区的这种方式将同一个topic可以分配到不同的服务器当中来去缓解服务器端的压力。 通过上面的命令我们就可以创建一个名为first的主题 4 查看主题 .inwindowskafka-topics.bat --zookeeper localhost:2181 --describe --topic first [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1O0paKTp-1644458525669)(C:UsersdellAppDataRoamingTypora ypora-user-imagesimage-20211130095949006.png)] 5 查看主题列表 .inwindowskafka-topics.bat --zookeeper localhost:2181 --list [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h4w3xhSU-1644458525670)(C:UsersdellAppDataRoamingTypora ypora-user-imagesimage-20211130102339901.png)] 3.2 Kafka 存储机制 每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。 但每一个段segment file消息数量不一定相等这样的特性方便old segment file高速被删除。默认情况下每一个文件大小为1G 每一个partiton仅仅须要支持顺序读写即可了。segment文件生命周期由服务端配置參数决定。 这样做的优点就是能高速删除无用文件。有效提高磁盘利用率。 3.2.1 数据分片 由于生产者生产的消息不断追加到 log 文件末尾为防止 log 文件过大导致数据定位效率低下Kafka 采取了分片和索引机制将每个 partition 分为多个 segment。每个 segment 对应两个文件“.index” 文件和 “.log 文件”。这些文件位于一个文件夹下该文件夹命名规则为topic 名称 分区序号。 比如创建一个名为firstTopic的topic其中有3个partition那么在 kafka 的数据目录/tmp/kafka-log中就有 3 个目录firstTopic-0~3 多个分区在集群中多个broker上的分配方法 1.将所有 N Broker 和待分配的 i 个 Partition 排序 2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上 3.2.2 log分段 每个分片目录中kafka 通过分段的方式将 数据 分为多个 LogSegment一个 LogSegment 对应磁盘上的一个日志文件00000000000000000000.log和一个索引文件(如上00000000000000000000.index)其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes107370 (设置分段大小,默认是1gb)选项进行设置。 “.index” 文件存储大量的索引信息“.log” 文件存储大量的数据索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。 3.2.3 日志的清除策略以及压缩策略 日志的清理策略有两个 1 根据消息的保留时间当消息在 kafka 中保存的时间超过了指定的时间就会触发清理过程 2根据 topic 存储的数据大小当 topic 所占的日志文件大小大于一定的阀值则可以开始删除最旧的消息。 通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置当其中任意一个达到要求都会执行删除。默认的保留时间是7 天 kafka会启动一个后台线程定期检查是否存在可以删除的消息。 日志压缩策略 Kafka 还提供了“日志压缩Log Compaction”功能通过这个功能可以有效的减少日志文件的大小缓解磁盘紧张的情况在很多实际场景中消息的 key 和 value 的值之间的对应关系是不断变化的就像数据库中的数据会不断被修改一样消费者只关心 key 对应的最新的 value。因此我们可以开启 kafka 的日志压缩功能服务端会在后台启动Cleaner线程池定期将相同的key进行合并只保留最新的 value 值。 3.3 Kafka 生产者 在 Kafka 中我们把产生消息的那一方称为生产者比如我们经常回去淘宝购物你打开淘宝的那一刻你的登陆信息登陆次数都会作为消息传输到 Kafka 后台当你浏览购物的时候你的浏览信息你的搜索指数你的购物爱好都会作为一个个消息传递给 Kafka 后台然后淘宝会根据你的爱好做智能推荐致使你的钱包从来都禁不住诱惑那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢发送过程是怎么样的呢 尽管消息的产生非常简单但是消息的发送过程还是比较复杂的 我们从创建一个ProducerRecord 对象开始ProducerRecord 是 Kafka 中的一个核心类它代表了一组 Kafka 需要发送的 key/value 键值对它由记录要发送到的主题名称Topic Name可选的分区号Partition Number以及可选的键值对构成。 在发送 ProducerRecord 时我们需要将键值对对象由序列化器转换为字节数组这样它们才能够在网络上传输。然后消息到达了分区器。 如果发送过程中指定了有效的分区号那么在发送记录时将使用该分区。如果发送过程中未指定分区则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有则将以循环的方式分配一个分区。选好分区后生产者就知道向哪个主题和分区发送数据了。 ProducerRecord 还有关联的时间戳如果用户没有提供时间戳那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。 然后这条消息被存放在一个记录批次里这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。 Kafka Broker 在收到消息时会返回一个响应如果写入成功会返回一个 RecordMetaData 对象它包含了主题和分区信息以及记录在分区里的偏移量上面两种的时间戳类型也会返回给用户。如果写入失败会返回一个错误。生产者在收到错误之后会尝试重新发送消息几次之后如果还是失败的话就返回错误消息。 Kafka 对于数据的读写是以分区为粒度的分区可以分布在多个主机Broker中这样每个节点能够实现独立的数据写入和读取并且能够通过增加新的节点来增加 Kafka 集群的吞吐量通过分区部署在多个 Broker 来实现负载均衡的效果 分区的原因 1方便在集群中扩展每个 partition 可以通过调整以适应它们的机器而一个 topic 又可以有多个 partition 组成因此整个集群就可以适应任意大小的数据了。 2可以提高并发因为可以以 partition 为单位读写了。 分区的原则 我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。 topicstring 类型NotNullpartitionint 类型可选timestamplong 类型可选keystring类型可选valuestring 类型可选headersarray 类型Nullable1指明 partition 的情况下直接将指明的值作为 partition 值 2没有指明 partition 值但有 key 的情况下将 key 值的 hash 值与 topic 的 partition 数进行取余得到 partition 值 3既没有 partition 又没有 key 值的情况下第一次调用时随机生成一个整数后面每次调用在这个整数上自增将这个值与 topic 可用的 partition 总数取余得到 partition 值也就是常说的 round-robin 轮询算法 3.3.1 数据可靠性保证 为保证 producer 发送的数据能可靠的发送到指定的 topictopic 中的每个 partition 收到 producer 发送的数据后都需要向 producer 发送 ack acknowledgement 确认收到如果 producer 收到 ack就会进行下一轮的发送否则重新发送数据。 3.4 Kafka 消费者 3.4.1概念 Kafka消费者对象订阅主题并接收Kafka的消息然后验证消息并保存结果。Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题每个消费者接收主题一部分分区的消息。消费者组的设计是对消费者进行的一个横向伸缩用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题通过增加消费者让它们分担负载分别处理部分分区的消息。 3.4.2 消费方式 1.消费位移确认 Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时通过参数enable.automit设定true表示自动提交默认。自动提交策略由消费者协调器ConsumerCoordinator每隔${automit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。 (1)自动提交。在创建一个消费者时默认是自动提交偏移量当然我们也可以显示设置为自动。例如我们创建一个消费者该消费者自动提交偏移量 (2)手动提交。在有些场景我们可能对消费偏移量有更精确的管理以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理或者用于其他网络访问请求等等复杂的业务处理在这种场景下所有的业务处理完成后才认为消息被成功消费这种场景下我们必须手动控制偏移量的提交。 2 以时间戳查询消息 Kafka 在0.10.1.1 版本增加了时间戳索引文件因此我们除了直接根据偏移量索引文件查询消息之外还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(MapTopicPartition, Long timestampsToSearch)方法该方法入参为一个Map 对象Key 为待查询的分区Value 为待查询的时间戳该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是若待查询的分区不存在则该方法会被一直阻塞。 3 消费速度控制 提供 pause(Collection partitions)和resume(Collection partitions)方法分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制结合业务使用。 3.5 Kafka 高效读取数据 1.kafka本身是分布式集群同时采用分区技术并发度高。 2.顺序写磁盘kafka的producer生产数据要写入到log文件中写的过程是一直追加到文件末端为顺序写。官网有数据表明同样的磁盘顺序写能到600M/s而随机写只有100k/s。 3.零复制技术 零拷贝是文件只需要经过Page Cache就可以直接发送出去了这样就极大的增加了发送数据的效率。 应用Page Cachekafka将数据直接持久化到Page Cache中其实就是内存中这样有几个优点1I/O Scheduler 可以将多个小块的写组装成大块的写操作降低了I/O次数。 四、kafka API 4.1 Producer API 4.1.1 消息发送流程 Kafka 的 producer 发送信息采用的是异步发送的方式。在消息发送的过程中涉及到两个线程一个是 main 线程一个是 Sender 线程以及一个线程共享变量—— RecordAccumulator 。main 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。 4.1.2 异步发送 API 1.导入依赖。 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.2.6.RELEASE/version /dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.1.0/version /dependency2.编写代码。 需要用到的类 KafkaProducer 需要一个生产者对象用来发送数据。ProducerConfig 获取所需一系类配置参数。ProducerRecord 每条数据都要封装成一个 ProducerRecord 对象。 1不带回调函数的 API public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException{String server 162.14.109.33:9092;// 1.创建kafka生产者的配置信息Properties properties new Properties();// 2.指定连接的Kafka集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);// 3.ACK应答级别//properties.put(acks, all);properties.put(ProducerConfig.ACKS_CONFIG, all);// 4.重试次数properties.put(retries, 0);// 5.批次大小properties.put(batch.size, 16384);// 6.等待时间properties.put(linger.ms, 10000);// 7.RecordAccumulator 缓冲区大小properties.put(buffer.memory, 33554432);// 8.key,value的序列化properties.put(key.serializer, org.apache.kafkamon.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafkamon.serialization.StringSerializer);// 9.创建生产者对象KafkaProducerString, String producer new KafkaProducer(properties);// 10.发送数据//异步 //这个生产者写一条消息的时候先是写到某个缓冲区 // 这个缓冲区里的数据还没写到broker集群里的某个分区的时候 // 它就返回到client去了。虽然效率快但是不能保证消息一定被发送出去了。producer.send(new ProducerRecord(test2, fmy,这是生产者异步发送的消息!));//同步 //这个生产者写一条消息的时候它就立马发送到某个分区去。 // follower还需要从leader拉取消息到本地follower再向leader发送确认 // leader再向客户端发送确认。由于这一套流程之后客户端才能得到确认所以很慢。 // FutureRecordMetadata demo producer.send(new ProducerRecord(demo, neu, 这里是生产者同步发送的消息!)); // RecordMetadata recordMetadata demo.get(); // System.out.println(得到ack);// 11. 关闭资源producer.close();} }2带回调函数的 API 回调函数会在 producer 收到 ack 时调用为异步调用该方法有两个参数分别是 RecordMetadata 和 Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 public class CallBackProducer {public static void main(String[] args) {String server 162.14.109.33:9092;// 1.创建配置信息Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);// 2.创建生产者对象KafkaProducerString, String producer new KafkaProducer(properties);// 3.发送数据producer.send(new ProducerRecord(test2, fmy,这是带回调方法的生产者发送的消息!), (metadata, exception) - {if (exception null) {System.out.println(元数据分区:metadata.partition() ,偏移量: metadata.offset());} else {exception.printStackTrace();}});// 4.关闭资源producer.close();} }3自定义分区器 public class MyPartitioner implements Partitioner {Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster){return 1;}Overridepublic void close(){}Overridepublic void configure(MapString, ? map){} }在生产者中加入自定义分区器 public class PartitionProducer {public static void main(String[] args) {String server 162.14.109.33:9092;// 1.创建配置信息Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);// 添加分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, com.fmy.kafka.config.MyPartitioner);// 2.创建生产者对象KafkaProducerString, String producer new KafkaProducer(properties);// 3.发送数据producer.send(new ProducerRecord(test2, fmy,这是带回调方法的生产者发送的消息!), (RecordMetadata metadata, Exception exception)- {if (exception null) {System.out.println(已收到ack,这里是回调方法);System.out.println(元数据分区:metadata.partition() ,偏移量: metadata.offset());} else {exception.printStackTrace();}});// 4.关闭资源producer.close();} }4.1.3 同步发送 API 同步发送的意思是一条消息发送后会阻塞当前线程直至返回 ack。由于 send 方法返回的是一个 Future 对象根据 Future 对象的特点我们也可以实现同步发送的效果只需在调用 Future 对象的 get 方法即可。 //异步 //这个生产者写一条消息的时候先是写到某个缓冲区 // 这个缓冲区里的数据还没写到broker集群里的某个分区的时候 // 它就返回到client去了。虽然效率快但是不能保证消息一定被发送出去了。 // producer.send(new ProducerRecord(test2, fmy,这是生产者异步发送的消息!));//同步 //这个生产者写一条消息的时候它就立马发送到某个分区去。 // follower还需要从leader拉取消息到本地follower再向leader发送确认 // leader再向客户端发送确认。由于这一套流程之后客户端才能得到确认所以很慢。FutureRecordMetadata demo producer.send(new ProducerRecord(demo, neu, 这里是生产者同步发送的消息!));RecordMetadata recordMetadata demo.get();4.2 Consumer API 4.2.1 自动提交 offset 编写代码。 需要用到的类 KafkaConsumer 需要创建一个消费者对象用来消费数据。ConsumerConfig 获取所需的一些列配置参数。ConsumerRecord 每条数据都要封装成一个 ConsumerRecord 对象。 public class MyConsumer { public static void main(String[] args) { String server 162.14.109.33:9092;/* 1.创建消费者配置信息 */Properties properties new Properties();/* 2.给配置信息赋值 *//* 连接的集群 */properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);// /* 开启自动提交 / properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); / 自动提交的延时 */ properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, “1000”); /* 关闭自动提交 */// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /* key,value的反序列化 */properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringDeserializer);/* 消费者组 */properties.put(ConsumerConfig.GROUP_ID_CONFIG, bigData);/* 3.创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(properties);/* 4.订阅主题 */consumer.subscribe(Collections.singletonList(test2));/* 5.获取数据 */while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(100));/* 解析并打印consumerRecords */for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(分区consumerRecord.partition()偏移量:consumerRecord.offset());System.out.println(key:consumerRecord.key() ,value: consumerRecord.value());}/* 同步提交当前线程会阻塞直到 offset 提交成功 */// consumermitSync(); /* 异步提交 */// consumermitAsync((MapTopicPartition, OffsetAndMetadata offsets, Exception exception)- { // if (exception ! null) { // System.err.println(“Commit failed for” offsets); // } // }); } } } 4.2.2 手动提交 offset 虽然自动提交 offset 十分简便但由于其是基于时间提交的开发人员难以把握 offset 提交的时机。因此 Kafka 提供了手动提交 offset 的 API。 手动提交 offset 的方法有两种分别是 commitSync同步提交 和 commitAsync异步提交。两者的相同点是都会将本次拉取的一批数据最高的偏移量提交。不同点是commitSync 阻塞当前线程一直到提交成功并且会自动失败重试而 commitAsync 则没有失败重试机制故有可能提交失败。 1.同步提交 offset /* 同步提交,当前线程会阻塞直到offset 提交成功 */consumermitSync();2.异步提交 offset /* 异步提交 */consumermitAsync((MapTopicPartition, OffsetAndMetadata offsets, Exception exception)- {if (exception ! null) {System.err.println(Commit failed for offsets);}});3.数据漏消费和重复消费分析 无论是同步提交还是异步提交 offset都有可能会造成数据漏消费或重复消费。先提交 offset 后消费有可能造成数据的漏消费先消费后提交 offset有可能造成数据的重复消费。 4.2.3 自定义存储 offset Kafka 0.9 版本以前offset 存储在 Zookeeper0.9 版本后默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外Kafka 还可以选择自定义存储 offset。 offset 的维护是相当繁琐的因为需要考虑到消费者的 Rebalance。 当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的消费者主题的分区发生变化就会触发到分区的重新分配重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 后每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区并且定位到每个分区最近提交的 offset 位置继续消费。 要实现自定义存储 offset需要借助 ConsumerRebalanceListener。其中提交和获取 offset 的方法需要根据所选的 offset 存储系统自行实现。 public class CustomerConsumer {private static MapTopicPartition, Long currentOffset new HashMap();public static void main(String[] args) {String server 162.14.109.33:9092;//创建配置信息Properties properties new Properties();//Kafka 集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);//消费者组只要 group.id 相同就属于同一个消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, bigData);//关闭自动提交 offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//Key 和 Value 的反序列化类properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringDeserializer);//创建一个消费者KafkaConsumerString, String consumer new KafkaConsumer(properties);//消费者订阅主题consumer.subscribe(Collections.singletonList(test2), new ConsumerRebalanceListener() {//该方法会在 Rebalance 之前调用Overridepublic voidonPartitionsRevoked(CollectionTopicPartition partitions) {commitOffset(currentOffset);}//该方法会在 Rebalance 之后调用Overridepublic voidonPartitionsAssigned(CollectionTopicPartition partitions) {currentOffset.clear();for (TopicPartition partition : partitions) {consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费}}});while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));//消费者拉取数据for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s , record.offset(), record.key(), record.value());currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());}commitOffset(currentOffset);//异步提交}}//获取某分区的最新 offsetprivate static long getOffset(TopicPartition partition) {return 0;}//提交该消费者所有分区的 offsetprivate static void commitOffset(MapTopicPartition, Long currentOffset) {} }4.3 自定义拦截器 4.3.1 拦截器原理 Producer 拦截器Interceptor是在 Kafka 0.10 版本引入的主要用于实现客户端的定制化控制逻辑。拦截器使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求。同时producer 允许用户指定多个 Interceptor 按序作用于同一消息从而形成一个拦截链。 Interceptor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor其定义的方法包括 1.onsend(ProducerRecord) 该方法封装进 KafkaProducer.send 方法中即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作但最好保证不要修改消息所属的 topic 和分区否则会影响目标分区的计算。 2.onAcknowledgement(RecordMetadata,Exception) 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中因此不要在该方法中放入很重的逻辑否则会拖慢 producer 的消息发送效率。 3.close() 关闭 interceptor主要用于执行一些资源清理工作。 4.configure(configs) 获取配置信息和初始化数据时调用。 4.3.2 拦截器案例 1.需求 实现一个简单的双 Interceptor 组成的拦截器链。第一个 Interceptor 会在消息发送前将时间戳信息添加到消息 value 的最前部第二个 Interceptor 会在消息发送后更新成功发送消息和失败发送消息个数。 2.分析 3.实现流程 1编写时间戳拦截器 //时间拦截器 //在消息发送前将时间戳信息加到消息value的最前部 public class TimeInterceptor implements ProducerInterceptorString, String {Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String producerRecord){// 1.取出数据String value producerRecord.value();// 2.创建一个新的ProducerRecord对象并返回//将return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.key(),System.currentTimeMillis() , value);}Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e){}Overridepublic void close(){}Overridepublic void configure(MapString, ? map){} }2编写计数拦截器 //计数拦截器 //在消息发送后更新成功发送消息或发送失败的消息数 public class CounterInterceptor implements ProducerInterceptorString,String {int success 0;int error 0;Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String producerRecord){return producerRecord;}Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e){if (recordMetadata ! null) {success;} else {error;}}Overridepublic void close(){System.out.println(success success);System.out.println(error error);}Overridepublic void configure(MapString, ? map){} }3编写 Producer 主程序 public class InterceptorProducer {public static void main(String[] args) {String server 162.14.109.33:9092;// 1.创建配置信息Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafkamon.serialization.StringSerializer);// 添加拦截器ArrayListString interceptors new ArrayList();interceptors.add(com.fmy.kafka.interceptor.TimeInterceptor);interceptors.add(com.fmy.kafka.interceptor.CounterInterceptor);properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);// 2.创建生产者对象KafkaProducerString, String producer new KafkaProducer(properties);// 3.发送数据for (int i 0; i 5; i){producer.send(new ProducerRecord(test2, fmy, 这是带拦截器的生产者发送的消息!));}// 4.关闭资源producer.close();} }五、kafka事务 事务是一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中或者说是一个原子操作生产消息和提交偏移量同时成功或者失败。 为了实现跨分区跨会话的事务需要引入一个全局唯一的Transaction ID并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID获得原来的PID。 为了管理TransactionKafka引入了一个新的组件Transaction CoordinatorProducer就是通过和 Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic这样即使整个服务重启由于事务状态得到保存进行中的事务状态可以得到恢复从而继续进行。 上述事务机制主要是从Producer方面考虑对于Consumer而言事务的保证就会相对较弱尤其时无法保证Commit 的信息被精确消费。这是由于Consumer可以通过offset访问任意信息而且不同的Segment File生命周期不同同一事务的消息可能会出现重启后被删除的情况。 5.1 事务场景 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。producer可能会给多个topic多个partition发消息这些消息也需要能放在一个事务里面这就形成了一个典型的分布式事务。kafka的应用场景经常是应用先消费一个topic然后做处理再发到另一个topic这个consume-transform-produce过程需要放到一个事务里面比如在消息处理或者发送的过程中如果失败了消费位点也不能提交。producer或者producer所在的应用可能会挂掉新的producer启动以后需要知道怎么处理之前未完成的事务 。流式处理的拓扑可能会比较深如果下游只有等上游消息事务提交以后才能读到可能会导致rt非常长吞吐量也随之下降很多所以需要实现read committed和read uncommitted两种事务隔离级别。 5.2 几个关键概念和推导 因为producer发送消息可能是分布式事务所以引入了常用的2PC所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。 事务管理中事务日志是必不可少的kafka使用一个内部topic来保存事务日志这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化因为不需要回溯事务的历史状态所以事务日志只用保存最近的事务状态。 因为事务存在commit和abort两种操作而客户端又有read committed和read uncommitted两种隔离级别所以消息队列必须能标识事务状态这个被称作Control Message。 producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联这个就是Transactional Id一个producer挂了另一个有相同Transactional Id的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了kafka目前没有引入全局序所以也没有transaction id这个Transactional Id是用户提前配置的。 TransactionalId能关联producer也需要避免两个使用相同TransactionalId的producer同时存在所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch 5.3 事务语义 5.3.1 多分区原子写入 事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。例如处理过程中发生了异常并导致事务终止这种情况下事务中的消息都不会被Consumer读取。现在我们来看下Kafka是如何实现原子的“读取-处理-写入”过程的。 首先我们来考虑一下原子“读取-处理-写入”周期是什么意思。简而言之这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A并且在对消息A进行了一些处理如B FA之后将消息B写入topic tp1则只有当消息A和B被认为被成功地消费并一起发布或者完全不发布时整个读取过程写入操作是原子的。 现在只有当消息A的偏移量X被标记为消耗时消息A才被认为是从topic tp0消耗的消费到的数据偏移量record offset将被标记为提交偏移量Committing offset。在Kafka中我们通过写入一个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。 由于offset commit只是对Kafkatopic的另一次写入并且由于消息仅在提交偏移量时被视为成功消费所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分所以整个步骤都是原子的。 5.3.2 粉碎“僵尸实例” 我们通过为每个事务Producer分配一个称为transactional.id的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer实例。 API要求事务性Producer的第一个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候Kafka broker用给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。 一旦这个epoch被触发任何具有相同的transactional.id和更旧的epoch的Producer被视为僵尸并被围起来, Kafka会拒绝来自这些Procedure的后续事务性写入。 5.3.3 读事务消息 现在让我们把注意力转向数据读取中的事务一致性。 Kafka Consumer只有在事务实际提交时才会将事务消息传递给应用程序。也就是说Consumer不会提交作为整个事务一部分的消息也不会提交属于中止事务的消息。 值得注意的是上述保证不足以保证整个消息读取的原子性当使用Kafka consumer来消费来自topic的消息时应用程序将不知道这些消息是否被写为事务的一部分因此他们不知道事务何时开始或结束此外给定的Consumer不能保证订阅属于事务一部分的所有Partition并且无法发现这一点最终难以保证作为事务中的所有消息被单个Consumer处理。 简而言之Kafka保证Consumer最终只能提供非事务性消息或提交事务性消息。它将保留来自未完成事务的消息并过滤掉已中止事务的消息。 5.4.4 事务处理Java API producer提供了五个事务方法 1.initTransactions 方法用来初始化事务这个方法能够执行的前提是配置了transactionalId如果没有则会报出IllegalStateException 2.beginTransaction 方法用来开启事务 3.sendOffsets 方法为消费者提供在事务内的位移提交的操作 4mitTransaction 方法用来提交事务 5.abortTransaction 方法用来中止事务类似于事务回滚。5.4.4.1 api分类 在一个原子操作中根据包含的操作类型可以分为三种情况前两种情况是事务引入的场景最后一种情况没有使用价值。 1.只有Producer生产消息 2.消费消息和生产消息并存这个是事务场景中最常用的情况就是我们常说的“consume-transform-produce ”模式 3.只有consumer消费消息这种操作其实没有什么意义跟使用手动提交效果一样而且也不是事务属性引入的目的所以一般不会使用这种情况 5.4.4.2 事务配置 1、创建消费者代码需要 将配置中的自动提交属性automit进行关闭而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )设置isolation.level 2、创建生成者代码如下,需要: 配置transactional.id属性配置enable.idempotence属性 5.4.4.3 “只有写”应用程序示例 package com.kafka.demo.transaction;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties; import java.util.concurrent.Future;public class TransactionProducer {private static Properties getProps(){Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(retries, 2); // 重试次数props.put(batch.size, 100); // 批量发送大小props.put(buffer.memory, 33554432); // 缓存大小根据本机内存大小配置props.put(linger.ms, 1000); // 发送频率满足任务一个条件发送props.put(client.id, producer-syn-2); // 发送端id,便于统计props.put(key.serializer, org.apache.kafkamon.serialization.StringSerializer);props.put(value.serializer, org.apache.kafkamon.serialization.StringSerializer);props.put(transactional.id,producer-1); // 每台机器唯一props.put(enable.idempotence,true); // 设置幂等性return props;}public static void main(String[] args) {KafkaProducerString, String producer new KafkaProducer(getProps());// 初始化事务producer.initTransactions();try {Thread.sleep(2000);// 开启事务producer.beginTransaction();// 发送消息到producer-synproducer.send(new ProducerRecordString, String(producer-syn,test3));// 发送消息到producer-asynFutureRecordMetadata metadataFuture producer.send(new ProducerRecordString, String(producer-asyn,test4));// 提交事务producermitTransaction();}catch (Exception e){e.printStackTrace();// 终止事务producer.abortTransaction();}} }5.4.4.4 消费-生产并存consume-Transform-Produce 在一个事务中既有生产消息操作又有消费消息操作即常说的Consume-tansform-produce模式。如下实例代码 package com.kafka.demo.transaction;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafkamon.TopicPartition;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future;public class consumeTransformProduce {private static Properties getProducerProps(){Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(retries, 3); // 重试次数props.put(batch.size, 100); // 批量发送大小props.put(buffer.memory, 33554432); // 缓存大小根据本机内存大小配置props.put(linger.ms, 1000); // 发送频率满足任务一个条件发送props.put(client.id, producer-syn-2); // 发送端id,便于统计props.put(key.serializer, org.apache.kafkamon.serialization.StringSerializer);props.put(value.serializer, org.apache.kafkamon.serialization.StringSerializer);props.put(transactional.id,producer-2); // 每台机器唯一props.put(enable.idempotence,true); // 设置幂等性return props;}private static Properties getConsumerProps(){Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, test_3);props.put(session.timeout.ms, 30000); // 如果其超时将会可能触发rebalance并认为已经死去重新选举Leaderprops.put(enable.automit, false); // 开启自动提交props.put(automit.interval.ms, 1000); // 自动提交时间props.put(auto.offset.reset,earliest); // 从最早的offset开始拉取latest:从最近的offset开始消费props.put(client.id, producer-syn-1); // 发送端id,便于统计props.put(max.poll.records,100); // 每次批量拉取条数props.put(max.poll.interval.ms,1000);props.put(key.deserializer, org.apache.kafkamon.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafkamon.serialization.StringDeserializer);props.put(isolation.level,read_committed); // 设置隔离级别return props;}public static void main(String[] args) {// 创建生产者KafkaProducerString, String producer new KafkaProducer(getProducerProps());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(getConsumerProps());// 初始化事务producer.initTransactions();// 订阅主题consumer.subscribe(Arrays.asList(consumer-tran));for(;;){// 开启事务producer.beginTransaction();// 接受消息ConsumerRecordsString, String records consumer.poll(500);// 处理逻辑try {MapTopicPartition, OffsetAndMetadata commits new HashMap();for(ConsumerRecord record : records){// 处理消息System.out.printf(offset %d, key %s, value %s , record.offset(), record.key(), record.value());// 记录提交的偏移量commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));// 产生新消息FutureRecordMetadata metadataFuture producer.send(new ProducerRecord(consumer-send,record.value()send));}// 提交偏移量producer.sendOffsetsToTransaction(commits,group0323);// 事务提交producermitTransaction();}catch (Exception e){e.printStackTrace();producer.abortTransaction();}}} }5.5 事务工作原理 5.5.1 事务协调器和事务日志 在Kafka 0.11.0中与事务API一起引入的组件是上图右侧的事务Coordinator和事务日志。 事务Coordinator是每个KafkaBroker内部运行的一个模块。事务日志是一个内部的Kafka Topic。每个Coordinator拥有事务日志所在分区的子集即, 这些borker中的分区都是Leader。 每个transactional.id都通过一个简单的哈希函数映射到事务日志的特定分区事务日志文件__transaction_state-0。这意味着只有一个Broker拥有给定的transactional.id。 通过这种方式我们利用Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可用并且所有事务状态都能够持久存储。 值得注意的是事务日志只保存事务的最新状态而不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务日志中。 5.5.2 事务数据流 数据流在抽象层面上有四种不同的类型。 A. producer和事务coordinator的交互   执行事务时Producer向事务协调员发出如下请求 initTransactions API向coordinator注册一个transactional.id。 此时coordinator使用该transactional.id关闭所有待处理的事务并且会避免遇到僵尸实例由具有相同的transactional.id的Producer的另一个实例启动的任何事务将被关闭和隔离。每个Producer会话只发生一次。当Producer在事务中第一次将数据发送到分区时首先向coordinator注册分区。当应用程序调用commitTransaction或abortTransaction时会向coordinator发送一个请求以开始两阶段提交协议。 B. Coordinator和事务日志交互   随着事务的进行Producer发送上面的请求来更新Coordinator上事务的状态。事务Coordinator会在内存中保存每个事务的状态并且把这个状态写到事务日志中这是以三种方式复制的因此是持久保存的。 事务Coordinator是读写事务日志的唯一组件。如果一个给定的Borker故障了一个新的Coordinator会被选为新的事务日志的Leader这个事务日志分割了这个失效的代理它从传入的分区中读取消息并在内存中重建状态。 C.Producer将数据写入目标Topic所在分区   在Coordinator的事务中注册新的分区后Producer将数据正常地发送到真实数据所在分区。这与producer.send流程完全相同但有一些额外的验证以确保Producer不被隔离。 D.Topic分区和Coordinator的交互 在Producer发起提交或中止之后协调器开始两阶段提交协议。在第一阶段Coordinator将其内部状态更新为“prepare_commit”并在事务日志中更新此状态。一旦完成了这个事务无论发生什么事都能保证事务完成。Coordinator然后开始阶段2在那里它将事务提交标记写入作为事务一部分的Topic分区。这些事务标记不会暴露给应用程序但是在read_committed模式下被Consumer使用来过滤掉被中止事务的消息并且不返回属于开放事务的消息即那些在日志中但没有事务标记与他们相关联。一旦标记被写入事务协调器将事务标记为“完成”并且Producer可以开始下一个事务。 5.6 事务相关配置 5.6.1 Broker configs (1) transactional.id.timeout.ms 在ms中事务协调器在生产者TransactionalId提前过期之前等待的最长时间并且没有从该生产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周一次的生产者作业维护它们的id (2) max.transaction.timeout.ms 事务允许的最大超时。如果客户端请求的事务时间超过此时间broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户机超时过大从而导致用户无法从事务中包含的主题读取内容。 默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。 (3) transaction.state.log.replication.factor 事务状态topic的副本数量。默认值:3 (4) transaction.state.log.num.partitions 事务状态主题的分区数。默认值:50 (5) transaction.state.log.min.isr 事务状态主题的每个分区ISR最小数量。默认值:2 (6) transaction.state.log.segment.bytes 事务状态主题的segment大小。默认值:104857600字节 5.6.2 Producer configs enable.idempotence开启幂等 transaction.timeout.ms事务超时时间 事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间。 这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout。在broke中设置ms时请求将失败并出现InvalidTransactionTimeout错误。 默认是60000。这使得交易不会阻塞下游消费超过一分钟这在实时应用程序中通常是允许的。 transactional.id 用于事务性交付的TransactionalId。这支持跨多个生产者会话的可靠性语义因为它允许客户端确保使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId则生产者仅限于幂等交付。 5.6.3 Consumer configs isolation.level read_uncommitted:以偏移顺序使用已提交和未提交的消息。read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序这个设置意味着我们必须在使用者中缓冲消息直到看到给定事务中的所有消息。 5.7 事务性能以及如何优化 5.7.1 Producer打开事务之后的性能 让我们把注意力转向事务如何执行。首先事务只造成中等的写入放大。 额外的写入在于 对于每个事务我们都有额外的RPC向Coordinator注册分区。在完成事务时必须将一个事务标记写入参与事务的每个分区。同样事务Coordinator在单个RPC中批量绑定到同一个Borker的所有标记所以我们在那里保存RPC开销。但是在事务中对每个分区进行额外的写操作是无法避免的。最后我们将状态更改写入事务日志。这包括写入添加到事务的每批分区“prepare_commit”状态和“complete_commit”状态。 我们可以看到开销与作为事务一部分写入的消息数量无关。所以拥有更高吞吐量的关键是每个事务包含更多的消息。 实际上对于Producer以最大吞吐量生产1KB记录每100ms提交消息导致吞吐量仅降低3。较小的消息或较短的事务提交间隔会导致更严重的降级。 增加事务时间的主要折衷是增加了端到端延迟。回想一下Consum阅读事务消息不会传递属于公开传输的消息。因此提交之间的时间间隔越长消耗的应用程序就越需要等待从而增加了端到端的延迟。 5.7.2 Consumer打开之后的性能 Consumer在开启事务的场景比Producer简单得多它需要做的是 过滤掉属于中止事务的消息。不返回属于公开事务一部分的事务消息。 因此当以read_committed模式读取事务消息时事务Consumer的吞吐量没有降低。这样做的主要原因是我们在读取事务消息时保持零拷贝读取。 此外Consumer不需要任何缓冲等待事务完成。相反Broker不允许提前抵消包括公开事务。 因此Consumer是非常轻巧和高效的。感兴趣的读者可以在本文档链接2中了解Consumer设计的细节。 六、SpringBoot集成kafka 6.1 配置Maven依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency6.2 项目具体代码 6.2.1 yml配置 spring:kafka:# kafka服务器地址(可以多个)bootstrap-servers: localhost:9092producer:# key/value的序列化key-serializer: org.apache.kafkamon.serialization.IntegerSerializervalue-serializer: org.apache.kafkamon.serialization.StringSerializer# 返回数据形式# acks: all# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288# 服务器地址bootstrap-servers: localhost:9092consumer:# key/value的反序列化key-deserializer: org.apache.kafkamon.serialization.IntegerDeserializervalue-deserializer: org.apache.kafkamon.serialization.StringDeserializer# 指定一个默认的组名group-id: kafka2# earliest:当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费# latest:当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常auto-offset-reset: earliest6.2.2 生产者 步骤 a.创建一个生产者对象kafkaProducer b.调用send反射消息ProducerRecor封装是key-value键值对 c.调用Future.get()表示获取服务器的响应 d.关闭生产者 代码 package com.kafka.demo.controller; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; RestController public class KafkaSyncProducerController { Autowired private KafkaTemplateInteger, String template;RequestMapping(send/sync/{massage}) public String send(PathVariable String massage) {final ListenableFutureSendResultInteger, String future this.template.send(test1, 0, 0, massage);try {final SendResultInteger, String sendResult future.get();final RecordMetadata metadata sendResult.getRecordMetadata();System.out.println(metadata.topic() metadata.partition() metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return success; }} 6.2.3 消费者 package com.kafka.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;Component public class KafkaConsumer {KafkaListener(topics test1)public void onMassage(ConsumerRecordInteger, String record) {System.out.println(收到的消息 record.topic() record.partition() record.offset() record.key() record.value());} }6.2.4 package com.kafka.demo;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}}最后
http://www.w-s-a.com/news/457468/

相关文章:

  • 挪威网站后缀如何外贸网络推广
  • 外汇交易网站开发仟亿家设计软件好吗亿家
  • 专门教做甜品的网站郑州高新区建设环保局网站
  • 建站公司怎么获客网站建设全网营销
  • 黄石做网站的公司html免费网站模板
  • 做个商城网站怎么做便宜优酷视频网站源码
  • 网站侧边栏导航代码泰兴市住房和建设局网站
  • html网站登录界面模板确定建设电子商务网站目的
  • wordpress 多站点迁移三台网站seo
  • 工信部网站备案文件好网站建设公司地址
  • 怎么做app和网站购物网站单页面怎么做的
  • 西宁专业做网站教育网站建设策划书
  • 个人网站域名怎么起网站建设业务好跑吗
  • 网页设计的网网页设计的网站企业网站怎样做优化
  • 论文中小企业的网站建设域名网站空间
  • 宿迁网站建设联系电话现在出入邯郸最新规定
  • 男女做羞羞的事情网站30岁转行做网站编辑
  • 做企业网站的轻量级cmswordpress 越来越慢
  • 无锡中英文网站建设莱芜网络公司
  • ps软件下载官方网站相关搜索优化软件
  • 世界杯网站源码下载做网站推广代理
  • 用股票代码做网站的wordpress通过标签调用文章
  • iis添加网站ip地址树莓派运行wordpress
  • 网站空间域名多少钱宿迁做网站公司
  • 福州建设企业网站网站交互主要做什么的
  • 英文网站建设方法门户网站特点
  • 腾讯云备案 网站名称萧山城市建设网站
  • 漳浦网站建设网络营销推广策略
  • 龙岗商城网站建设教程百度关键词排名突然没了
  • 深圳网站建设服务哪家有织梦网站模板安装