当前位置: 首页 > news >正文

佛山市和城乡建设局网站国外中文网站排行

佛山市和城乡建设局网站,国外中文网站排行,网站文字排版,百度广告服务商kafka消费积压 如果生产者发送消息的速度过快#xff0c;或者是消费者处理消息的速度太慢#xff0c;那么就会有越来越多的消息无法及时消费#xff0c;也就是消费积压。 消费积压时#xff0c; (1) 可以增加Topic的分区数#xff0c;并且增加消费组的消费者数量#…kafka消费积压 如果生产者发送消息的速度过快或者是消费者处理消息的速度太慢那么就会有越来越多的消息无法及时消费也就是消费积压。 消费积压时 (1) 可以增加Topic的分区数并且增加消费组的消费者数量让消费者数等于分区数。 (2) 还可以使用多线程消费提高消费速度。 kafka多线程消费的代码 public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static final String GROUP_ID group.demo;public static void main(String[] args) {Properties props initConfig();KafkaConsumerThread consumerThread new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* return*/public static Properties initConfig() {Properties props new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumerString, String kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer new KafkaConsumer(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber threadNumber;executorService new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000),new ThreadPoolExecutor.CallerRunsPolicy());}Overridepublic void run() {try {while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error(run error, e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecordsString, String records;public RecordsHandler(ConsumerRecordsString, String records) {this.records records;}Overridepublic void run() {//处理records.for (ConsumerRecordString, String record : records) {System.out.println(record:record.value() ,thread: Thread.currentThread().getName());}}}} 发送消息后使用多线程消息运行结果如下 record:{id:1234,name:lin},thread:pool-1-thread-1 record:{id:5678,name:chen},thread:pool-1-thread-2 record:{id:91011,name:wu},thread:pool-1-thread-3参考资料 《深入理解Kafka核心设计与实践原理》
http://www.w-s-a.com/news/213430/

相关文章:

  • 网站建设需求规格说明书中山模板建站公司
  • wordpress get值网站建设 seo sem
  • 网站建设微信开发工厂代加工平台
  • 厦门 网站建设 公司哪家好asp.net 创建网站
  • 专业北京网站建设凡科网做网站怎么样
  • 金富通青岛建设工程有限公司网站浙江省住建厅四库一平台
  • 有搜索引擎作弊的网站企业建设H5响应式网站的5大好处6
  • 是做网站编辑还是做平面设计seo外包公司接单
  • 做性的网站有哪些苏州专业网站设计制作公司
  • 陵水网站建设友创科技十大优品店排名
  • 想换掉做网站的公司简要说明网站制作的基本步骤
  • 国企公司网站制作wordpress 浮动定位
  • 网站网页直播怎么做的企业网站建设推荐兴田德润
  • 网站建设熊猫建站厦门seo全网营销
  • 扁平网站设计seo是什么岗位的缩写
  • 工商企业网站群晖配置wordpress 80端口
  • 企业网站建设流程步骤镇江东翔网络科技有限公司
  • 网络工程师和做网站哪个难网络建站如何建成
  • 网站建设需要哪些项目游民星空是用什么做的网站
  • 旅游网站建设要如何做百度商城网站建设
  • destoon 网站搬家中国企业500强都有哪些企业
  • 商城网站前端更新商品天天做吗哈尔滨做网站优化
  • 新乡网站开发wordpress 产品分类侧边栏
  • 网站自己做自己的品牌好做互联网企业分类
  • 项目网站建设方案石家庄网站快速排名
  • 网站开发大作业报告做电商网站的参考书
  • Apache局域网网站制作wordpress外链自动保存
  • 网站备案号要怎么查询千锋教育培训机构地址
  • 门户网站建设要求几款免费流程图制作软件
  • 花生壳域名可以做网站域名吗wordpress内链工具