当前位置: 首页 > news >正文

做视频解析网站wordpress mdtf

做视频解析网站,wordpress mdtf,做网站公众号多少钱,浙江做网站找谁Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构#xff08;log-based#xff09;的消息引擎 消费消息时#xff0c;只是从磁盘文件上读取数据#xff0c;不会删除消息数据位移数据能由消费者控制#xff0c;能很容易修改位移的值#xff0c;实现重复消费历史数据… Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构log-based的消息引擎 消费消息时只是从磁盘文件上读取数据不会删除消息数据位移数据能由消费者控制能很容易修改位移的值实现重复消费历史数据 重设位移的两个维度 : 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值时间维度 : 将位移调整成 该时间的最小位移 7 种重设策略 : 维度策略含义位移维度Earliest位移调整当前最早位移处Latest位移调整到当前最新位移处Current位移调整到当前最新提交位移处Specified-Offset位移调整成指定位移Shift-By-N位移调整到当前位移 N 处时间维度DataTime位移调整到大于给定时间的最小位移处Duration位移调整到离当前时间指定间隔的位移处 Earliest 策略 : 将位移调整到主题当前最早位移处 不一定是 0很久的消息会被 Kafka 自动删除所以当前最早位移可能是 0 的值当要重新消费主题的所有消息就用 Earliest 策略 Latest 策略 : 把位移重设成最新末端位移 当向某个主题发送 15 条消息最新末端位移是 15当要跳过所有历史消息从最新的消息处开始消费就用 Latest 策略 Current 策略 : 将位移调整成消费者当前提交的最新位移 修改消费者代码并重启消费者结果发现代码有问题回滚前的代码变更还把位移重设为消费者重启时的位置 Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处 消费者处理某条错误消息时 , 能手动跳过此消息的处理生产中 , 出现 corrupted 消息无法被消费时消费者会抛出异常无法继续工作 Specified-Offset 策略 : 指定位移的绝对数值 Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离 把位移重设成当前位移的前 100 条位移处指定 N 为 -100 DateTime : 指定一个时间将位移重置到该时间之后的最早位移处 使用场景 : 重新消费昨天的数据并使用该策略重设位移到昨天 0 点 Duration 策略 : 相对的时间间隔将位移调整到距离当前给定时间间隔的位移处格式是 PnDTnHnMnS Java 8 引入 Duration : 以字母 P 开头后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)将位移调回到 15 分钟前就指定 PT0H15M0S 消费者 API Earliest 策略 : Properties consumerProperties new Properties(); // 禁止自动提交位移 consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 重设的消费者组的组 ID consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic test; // 要重设位移的 Kafka 主题 try (final KafkaConsumerString, String consumer new KafkaConsumer(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo -new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList())); } Latest 策略 : consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo - new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));Current 策略 : 获取当前提交的最新位移 : consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).forEach(tp - {long committedOffset consumer.committed(tp).offset();consumer.seek(tp, committedOffset); });Specified-Offset 策略 : long targetOffset 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset); }实现 Shift-By-N 策略 for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset consumer.committed(tp).offset() 123L; consumer.seek(tp, targetOffset); }DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点 long ts LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - ts));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset()); }Duration 策略 : 将位移调回 30 分钟前 MapTopicPartition, Long timeToSearch consumer.partitionsFor(topic).stream().map(info - new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp - System.currentTimeMillis() - 30 * 1000 * 60));for (Map.EntryTopicPartition, OffsetAndTimestamp entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset()); }命令行 Earliest 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-earliest –executeLatest 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-latest --executeCurrent 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-current --executeSpecified-Offset 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --all-topics --to-offset offset --executeShift-By-N 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --shift-by offset_N --executeDateTime 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --to-datetime 2019-06-20T20:00:00.000 --executeDuration 策略 : bin/kafka-consumer-groups.sh \ --bootstrap-server kafka-host:port \ --group test-group --reset-offsets \ --by-duration PT0H30M0S --execute
http://www.w-s-a.com/news/356268/

相关文章:

  • 建设银行企业网站访问不了wordpress搬到谷歌服务器
  • 网站建设与网站优化销售别墅庭院园林景观设计公司
  • 沈阳红方城网站建设专业的微网站哪家好
  • 医院网站asp东营信息发布平台
  • 网站全站建设开题报告范文南京本地网站
  • 网站漏洞扫描工具wampserver集成环境搭建了一个织梦cms网站
  • 如何在局域网上做网站宁波设计公司排行榜
  • 自己的电脑做网站服务器吗百度搜索风云榜总榜
  • 做化妆品的一些网站企业网站建设与营运计划书
  • 重庆速代网络科技seo整站优化服务教程
  • 成都比较好的装修设计公司seo3的空间构型
  • 开发商建设审批网站成都创意设计公司
  • 百度快照比网站上线时间早wordpress新建阅读量字段
  • 国家工程建设标准化协会网站网站开发工具有
  • 上海网站建设集中公关公司组织架构图
  • wordpress副标题的作用百度网站标题优化
  • 大连哪家公司做网站比较好wordpress 判断用户组
  • 网站空间1g多少钱东莞公司高端网站建设
  • 网站服务器出错是什么意思做餐饮酒店网站
  • 房地产网站建设策划方案网站建设教程简笔画
  • 3d室内设计软件wordpress本地优化加速版
  • 南京高新区规划建设局网站石家庄哪里做网站比较好
  • 免费培训课程网站优化的方式
  • 做网站要固定电话在家自己做网站
  • 招聘网站开发视频新手如何做网站维护
  • flash 网站欣赏国外做的比较好的网站有哪些
  • 推广一个网站需要什么官网首页设计
  • 淘宝建设网站的理由企业官网建设哪家好
  • 青岛网站推wordpress主题切换
  • 天元建设集团有限公司资质郑州网站seo推广