当前位置: 首页 > news >正文

正规网站开发文案庆阳建设局网站

正规网站开发文案,庆阳建设局网站,个人养老金制度要来了,云南专业建网站消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式#xff0c;本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据#xff0c;这个汇报过程被称为提交位移#xff08;Committing Offsets#xff09;。因为 Consumer 能够同时消费…消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为 Consumer 能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即 Consumer 需要为分配给它的每个分区提交各自的位移数据。 提交位移主要是为了记录Consumer 的消费进度这样当 Consumer 发生重启之后就能够从 Kafka 中读取之前提交的位移从而继续消费避免以避免重复消费或消息丢失等。换句话说位移提交是 Kafka 提供给你的一个工具或语义保障你负责维持这个语义保障即如果你提交了位移 X那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。 因为位移提交非常灵活你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息你提交的位移值却是 20那么从理论上讲就丢失了10条数据相反地如果你提交的位移值是 5那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。 位移提交 从使用角度来说位移提交分为自动提交和手动提交从 Consumer 的角度来说位移提交分为同步提交和异步提交。 自动提交 默认情况下就是自动提交你根本无需关心位移提交的事情Consumer 端有个参数 enable.auto.commit默认值是 true即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms默认值是 5 秒即每 5 秒会为你自动提交一次位移。 这里我们用一段简单的代码来看看这两个参数怎么使用 Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, kafka_test);// 自动提交props.put(enable.auto.commit, true);// 间隔2秒 props.put(auto.commit.interval.ms, 2000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records) {// process}} 手动提交 设置 enable.auto.commit 为 false还需要调用相应的 API 手动提交位移KafkaConsumer.commitSync()。 // props.put(enable.auto.commit, false); while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常} } commitSync()有一个缺陷提交时Consumer 程序会处于阻塞状态在生产系统中因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔但这样做的后果是 Consumer 的提交频率下降在下次 Consumer 重启回来后会有更多的消息被重新消费。鉴于这个问题Kafka 提供了另一个 异步API 方法KafkaConsumer.commitAsync()。 不过commitAsync 的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此异步提交的重试其实没有意义所以 commitAsync 是不会重试的。 我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题 try {while(true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); } } catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close(); } } 同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交我们调用 commitAsync() 避免程序阻塞而在 Consumer 要关闭前我们调用 commitSync() 方法执行同步阻塞式的位移提交以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后我们既实现了异步无阻塞式的位移管理也确保了 Consumer 位移的正确性如果你自行编写代码开发一套 Kafka Consumer 应用可以尝试使用上面的代码范例来实现手动的位移提交。 其实还有一种更高级的提交方式就是分批量提交就不再这里展开留给大家查资料学习也欢迎各位同学在评论区交流讨论
http://www.w-s-a.com/news/125221/

相关文章:

  • 深圳兼职做网站涿州网站制作
  • 能找本地人做导游的网站app模板素材下载免费
  • 网站积分的作用网站开发需要看相关书籍
  • 建设银行总行网站alexa排名与什么有关系
  • 阿里云服务器发布网站收款网站怎么建设
  • 开发东莞网站制作公司做网站优化步骤
  • 网站版权信息的正确写法如何制作网络游戏
  • 郑州移动端网站建设如何在网上推广自己的公司
  • 企业建站源码系统破解网站后台
  • 石家庄网站开发报价企业注册资本代表什么
  • 招商平台公司宁波seo教程推广平台
  • 哪些网站可以做房产推广垂直门户网站都有什么
  • 不得不知道的网站金石项目管理软件
  • 怎么恢复网站数据库网站开发作业代做
  • 哪里建设网站最好用中国第五冶金建设公司医院网站
  • 雄安网建 网站建设订餐网站建设
  • 广州视频网站建站公司网站 体系
  • 青浦门户网站网站推广烟台公司电话
  • 湖北荆门建设银行网站wordpress购物模板下载
  • 学ui+wordpress模板北京推广优化
  • 建分类网站得花多少钱深圳设计网站开发
  • 网站集群建设和网站集约化百度商桥怎么绑定网站
  • 青岛模板网站建设价格网络品牌网站建设
  • 网站建设的几大要素网站的做网站的公司
  • 怎么登陆自己的公司网站垂直电商网站建设
  • 温州微网站制作哪里有许昌网站建设哪家最好
  • 中国中小企业网站官网网页制作工具按其制作方式分 可以分为
  • 做资源下载网站违法吗河南企业做网站
  • 网站开发总体功能设计网站建设 北京昌平
  • 辽宁省高等级公路建设局网站书画院网站建设方案