当前位置: 首页 > news >正文

angular做门户网站网站如何引入流量

angular做门户网站,网站如何引入流量,网站升级 html,企业文化标语经典今天的议题是#xff1a;如何快速处理kafka的消息积压 通常的做法有以下几种#xff1a; 增加消费者数增加 topic 的分区数#xff0c;从而进一步增加消费者数调整消费者参数#xff0c;如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使… 今天的议题是如何快速处理kafka的消息积压 通常的做法有以下几种 增加消费者数增加 topic 的分区数从而进一步增加消费者数调整消费者参数如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率本文的重点就是通过提高消费速率来解决消息积压。 经验判断消费速率低下的主要原因往往都是数据处理时间长业务逻辑复杂最终导致一次 poll 的时间被无限拉长如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点 业务逻辑对乱序数据不敏感因为并行一定会导致乱序问题kafka 的消费者是线程不安全的如何提交 offset 基于上述几点思路就是消费者 poll 下来一批数据交给多个线程去并行处理消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下 第一步创建一个MultipleConsumer类用于封装消费者和线程池 public class MultipleConsumer {private final KafkaConsumerString, String consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning true;public MultipleConsumer(Properties properties, ListString topics, int threadNum) {// 实例化消费者consumer new KafkaConsumer(properties);// 订阅主题consumer.subscribe(topics);this.threadNum threadNum;this.threadPool Executors.newFixedThreadPool(threadNum);} }理论上相较于传统的消费速率可以提升 threadNum 倍。 第二步因为需要并行处理一批 poll 数据因此需要对数据进行切分切分逻辑如下 private MapInteger, ListConsumerRecordString, String splitTask(ConsumerRecordsString, String consumerRecords) {HashMapInteger, ListConsumerRecordString, String tasks new HashMap();for (int i 0; i threadNum; i) {tasks.put(i, new ArrayList());}int recordIndex 0;for (ConsumerRecordString, String consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex;}return tasks;}这里采用轮训的方式且切分的个数与 threadNum 一致尽可能保证每个线程处理的数据数量相差不大 第三步定义一个静态内部类用来处理数据并处理同步逻辑因为需要等待所有线程执行完再提交 offset private static class InnerProcess implements Runnable {private final ListConsumerRecordString, String records;private final CountDownLatch countDownLatch;public InnerProcess(ListConsumerRecordString, String records, CountDownLatch countDownLatch) {this.records records;this.countDownLatch countDownLatch;}Overridepublic void run() {try {// 处理消息for (ConsumerRecordString, String record : records) {System.out.println(topic: record.topic() , partition: record.partition() , offset: record.offset() , key: record.key() , value: record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}使用 CountDownLatch 实现线程同步逻辑假设每条数据的业务处理时间为 1 s 第四步消费者 poll 逻辑 public void start() {while (isRunning) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务MapInteger, ListConsumerRecordString, String splitTask splitTask(consumerRecords);CountDownLatch countDownLatch new CountDownLatch(threadNum);// 提交任务for (int i 0; i threadNum; i) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) - {if (e ! null) {System.out.println(提交偏移量失败);}});}}}完整代码如下 import org.apache.kafka.clients.consumer.*;import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.*;/*** author wjun* date 2023/3/1 14:50* email wjunjobsoutlook.com* describe*/ public class MultipleConsumer {private final KafkaConsumerString, String consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning true;public MultipleConsumer(Properties properties, ListString topics, int threadNum) {// 实例化消费者consumer new KafkaConsumer(properties);// 订阅主题consumer.subscribe(topics);this.threadNum threadNum;this.threadPool Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务MapInteger, ListConsumerRecordString, String splitTask splitTask(consumerRecords);CountDownLatch countDownLatch new CountDownLatch(threadNum);// 提交任务for (int i 0; i threadNum; i) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) - {if (e ! null) {System.out.println(提交偏移量失败);}});}}}private MapInteger, ListConsumerRecordString, String splitTask(ConsumerRecordsString, String consumerRecords) {HashMapInteger, ListConsumerRecordString, String tasks new HashMap();for (int i 0; i threadNum; i) {tasks.put(i, new ArrayList());}int recordIndex 0;for (ConsumerRecordString, String consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex;}return tasks;}public void stop() {isRunning false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final ListConsumerRecordString, String records;private final CountDownLatch countDownLatch;public InnerProcess(ListConsumerRecordString, String records, CountDownLatch countDownLatch) {this.records records;this.countDownLatch countDownLatch;}Overridepublic void run() {try {// 处理消息for (ConsumerRecordString, String record : records) {System.out.println(topic: record.topic() , partition: record.partition() , offset: record.offset() , key: record.key() , value: record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}} }测试一下 import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test;import java.util.ArrayList; import java.util.List; import java.util.Properties;/*** author wjun* date 2023/3/1 16:03* email wjunjobsoutlook.com* describe*/ public class MultipleConsumerTest {private static final Properties properties new Properties();private static final ListString topics new ArrayList();public static void before() {properties.put(bootstrap.servers, localhost:9092);properties.put(group.id, test);properties.put(enable.auto.commit, false);properties.put(auto.commit.interval.ms, 1000);properties.put(session.timeout.ms, 30000);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);topics.add(multiple_demo);}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();} }20 条数据的处理事件只需要 4s(threadNume 5即缩短 5 倍) 但是此方法的缺点 只适用于业务逻辑复杂导致的处理时间长的场景对数据乱序不敏感的业务场景
http://www.w-s-a.com/news/668353/

相关文章:

  • 做app和网站怎样如何做html网站
  • php开发手机端网站开发更换网站标题
  • 提供网站建设报价延津县建设局网站
  • 江苏网站建设流程土巴兔全包装修怎么样
  • 环保网站建设方案带漂浮广告的网站
  • 淘宝客合伙人网站建设建站前端模板
  • 网站单页模板怎么安装中世纪变装小说wordpress
  • 手机免费建设网站制作宝安第一网站
  • 如何做x响应式网站asp网站出现乱码
  • 网站备案的幕布是什么来的游戏推广代理
  • 固始城乡建设局的网站怎么打不开了上海建设网站
  • 关于加强网站信息建设的通知3d网站开发成本
  • 网站建设实训过程报告成品网站1688入口的功能介绍
  • 网站定制开发需要什么资质国外设计灵感网站
  • 搜搜网站收录广告设计与制作模板图片
  • 江苏省建设监理协会网站汕头网站建设方案优化
  • 中国风网站配色方案正规少儿编程排名
  • 兼职做网站的软件wordpress赞的代码
  • 销售网站的技巧四博互联做的网站
  • 网站建设 图片问题小程序免费制作平台凡科网页版
  • 猪八戒网做网站怎么样网站建设 客户同程
  • 西安网站建设那家强网站建设方案 报价
  • 销售网站建设考核指标网站建设价格组成
  • 网站302跳转网站建设完成后 下一步做什么
  • 赣州制作网站企业硬件开发用什么语言
  • 新网站如何被网站收录百度排名优化软件
  • html网站简易模板国内买机票的网站建设
  • 百度关键词分析工具百度seo排名软
  • 自己怎样做免费网站ueditor 上传wordpress
  • 深圳高端网站开发网站建设公司销售技巧