当前位置: 首页 > news >正文

seo资源网站排名北京木马工业设计

seo资源网站排名,北京木马工业设计,网站建设模板怎么用,百度营销推广官网kafka消费积压 如果生产者发送消息的速度过快#xff0c;或者是消费者处理消息的速度太慢#xff0c;那么就会有越来越多的消息无法及时消费#xff0c;也就是消费积压。 消费积压时#xff0c; (1) 可以增加Topic的分区数#xff0c;并且增加消费组的消费者数量#…kafka消费积压 如果生产者发送消息的速度过快或者是消费者处理消息的速度太慢那么就会有越来越多的消息无法及时消费也就是消费积压。 消费积压时 (1) 可以增加Topic的分区数并且增加消费组的消费者数量让消费者数等于分区数。 (2) 还可以使用多线程消费提高消费速度。 kafka多线程消费的代码 public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static final String GROUP_ID group.demo;public static void main(String[] args) {Properties props initConfig();KafkaConsumerThread consumerThread new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* return*/public static Properties initConfig() {Properties props new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumerString, String kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer new KafkaConsumer(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber threadNumber;executorService new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000),new ThreadPoolExecutor.CallerRunsPolicy());}Overridepublic void run() {try {while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error(run error, e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecordsString, String records;public RecordsHandler(ConsumerRecordsString, String records) {this.records records;}Overridepublic void run() {//处理records.for (ConsumerRecordString, String record : records) {System.out.println(record:record.value() ,thread: Thread.currentThread().getName());}}}} 发送消息后使用多线程消息运行结果如下 record:{id:1234,name:lin},thread:pool-1-thread-1 record:{id:5678,name:chen},thread:pool-1-thread-2 record:{id:91011,name:wu},thread:pool-1-thread-3参考资料 《深入理解Kafka核心设计与实践原理》
http://www.w-s-a.com/news/844242/

相关文章:

  • 怎样开发设计网站建设博物馆网页设计案例
  • 山西建设厅网站查不了seo搜索引擎优化包邮
  • 临沂网站建设价格太原网站优化公司
  • 网页设计基础课程设计搜索引擎优化英文
  • 网站备案号怎么查楼书设计素材网站
  • 网站设计机构有哪些中国建设银行网站登录不上
  • 烟台理工学校网站罗湖建设网站
  • 卑鄙的网站开发公司郑州人才网站
  • 成都专业的网站设计公司文化建设的成就
  • 做书籍封皮的网站如何建网站教程视频
  • 唐山建站公司模板ipfs做网站
  • 贵阳做网站品牌网站模板
  • 紫网站建设我的个人博客
  • 优秀网站菜单网页上的视频怎么下载
  • 龙口建网站公司价格国内的平面设计网站
  • 电子商务网站建设与管理读后感上海市基础工程公司
  • 织梦免费企业网站做网站时,404网页如何指向
  • 摄影工作室网站源码百度为什么会k网站
  • 哪个网站有淘宝做图的素材网站分享做描点链接
  • 做哪个网站零售最好网站空间在哪里
  • 荆州网站建设多少钱南阳做网站推广
  • 网站代理打开个人网站设计源码
  • 做php网站的话要学什么语言wordpress搜索不到
  • 金华官方网站建设网络营销策划模板
  • 网站开发到上线在线生成小程序
  • 动易网站设计方案郑州营销网站建设
  • 怎么上网站做简易注销的步骤郑州大学现代远程教育《网页设计与网站建设》课程考核要求
  • 新乡网站建设新乡长沙本地论坛有哪些
  • 潍坊中企动力做的网站怎么样wordpress接入微博
  • 网站开发者所有权归属网站项目建设的必要性