当前位置: 首页 > news >正文

织梦网站后台管理系统怎么制作钓鱼网站链接

织梦网站后台管理系统,怎么制作钓鱼网站链接,保定八大平台公司,黑龙江省建设会计协会网站首页上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的#xff0c;可以先用单点将canal事件发送到mq中#xff0c;再由mq并发处理#xff0c;另外mq还可以做到削峰的作用#xff0c;让canal数据不至于阻塞。 使用队列#xff0c;可以自己起一个单实…上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的可以先用单点将canal事件发送到mq中再由mq并发处理另外mq还可以做到削峰的作用让canal数据不至于阻塞。 使用队列可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里也可以直接使用canal server canal server原生支持几种队列Kafka, RocketMQ ,RabbitMQ, PulsarMQ 下面了解一下canal sever具体的处理过程。 canal server将消息投递到mq中 在canal server中如果检测到配置了mq 就会启动线程来读取bin log事件并投递到mq中 CanalMQStarter while (running destinationRunning.get()) {Message message;if (getTimeout ! null getTimeout 0) {message canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId message.getId();int size message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId ! -1 size ! 0) {canalMQProducer.send(canalDestination, message, new Callback() {Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}从代码可以看到首先调用getWithoutAck从实例获取事件然后调用canalMQProducer.send将消息投递到队列中如果投递成功就执行ack否则执行rollback 因为投递消息到队列是非常快的操作所以这就降低了阻塞的风险。 最终发送mq消息的代码如下(CanalRocketMQProducer) private void sendMessage(Message message, int partition) {//...SendResult sendResult this.defaultMQProducer.send(message, (mqs, msg, arg) - {if (partition mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}这里有个分区的概念对于RocketMQ来说就是队列选择这关系到顺序消费。 业务代码使用RocketMQCanalConnector消费数据 while (running) {try {connector.connect();connector.subscribe();while (running) {ListMessage messages connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning(); }可以看到这和之前ClusterCanalConnector一样的处理方法只是底层实现不一样在subscribe的时候调用了mq的subscribe: public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, *);rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}可以看到这里使用了MessageListenerOrderly来进行顺序消费 使用process来处理消息 private boolean process(ListMessageExt messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage new ConsumerBatchMessageMessage(messageList);} else {batchMessage new ConsumerBatchMessageFlatMessage(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error(Put message to queue error, e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error(Interrupted when waiting messages to be finished., e);throw new RuntimeException(e);}boolean isSuccess batchMessage.isSuccess();return isCompleted isSuccess;}这里将数据放到了messageBlockingQueue中然后等待消息执行完成 ConsumerBatchMessage内置了一个CountDownLatch batchMessage.waitFinish会阻塞在这里。 客户端使用getFlatList/getFlatListWithoutAck取数据时就是从messageBlockingQueue取出数据调用ack时会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。 Overridepublic ListMessage getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage ! null) {throw new CanalClientException(mq get/ack not support concurrent async ack);}ConsumerBatchMessage batchMessage messageBlockingQueue.poll(timeout, unit);//...}Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage ! null) {this.lastGetBatchMessage.ack();}//...}对于MessageListenerOrderly来说是一个消费线程对应一个mq队列的从而实现多线程消费而这里把不同mq队列的消息在messageBlockingQueue中排队并且使用getListWithoutAck/ack也不支持并发又变成了单线程模式这可能对性能造成影响建议生产环境对性能有要求时采用自己写代码来实现mq的消费。 配置 mq相关参数说明
http://www.w-s-a.com/news/924827/

相关文章:

  • 图片网站怎么做排名怎么分析一个网站seo
  • 伪原创对网站的影响深圳装修公司排名100强
  • 网站建设公司效果个人可以做医疗信息网站吗
  • 网站使用arial字体下载微网站 建设
  • 文化馆网站建设意义营销型国外网站
  • 公司网站定位建议wordpress怎么用模板
  • 中国十大热门网站排名计算机选什么专业最好
  • 怀化建设企业网站太原网站关键词排名
  • 空间注册网站网站制作是怎么做的
  • 数码家电商城网站源码一个网站的成本
  • 网站伪静态是什么意思麻涌东莞网站建设
  • 理县网站建设公司郑州仿站定制模板建站
  • 手机网站建设网站报价诸城人才网招聘网
  • 一起做网站怎么下单临沂网站制作
  • 公司网站案例企业网站 模版
  • 做的好的响应式网站有哪些网站界面设计案例
  • 上海创意型网站建设icp备案网站信息
  • 网站没收录中山手机网站制作哪家好
  • 代驾软件开发流程wordpress 博客主题 seo
  • 成都的教育品牌网站建设网站广告js代码添加
  • 网站找人做seo然后网站搜不到了网站建设seoppt
  • 做网站优化有用吗学做文案的网站
  • wordpress 知名网站怎么做微网站
  • 用电脑怎么做原创视频网站河南建设工程信息网一体化平台官网
  • 云服务器和网站空间郑州做招商的网站
  • 规模以上工业企业的标准北京seo结算
  • 软件开发过程模型如何做网站性能优化
  • 网站建站公司广州南京江北新区楼盘
  • 哪些做展架图的网站好开发公司2022年工作计划
  • 磨床 东莞网站建设wordpress下载类主题系统主题