当前位置: 首页 > news >正文

长沙网站建设 网站设计织梦cms做网站怎么样

长沙网站建设 网站设计,织梦cms做网站怎么样,吉林省 网站建设,建设直播网站需要哪些许可证为什么要使用MQ#xff1f; 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动#xff0c;已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢#xff1f; 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…为什么要使用MQ 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中在多实例/分布式环境下其他的服务在订阅同一事件Topic时可以在各自的服务中进行消费最大化空闲服务的利用。 源码地址Gitee 整合RocketMQ 依赖版本 JDK 17 Spring Boot 3.2.0 RocketMQ-Client 5.0.4 RocketMQ-Starter 2.2.0 可以参考这篇进行RocketMQ安装 Spring Boot 3.0 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。 引入RocketMQ依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client-java/artifactIdversion5.0.4/version /dependency dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.0/version /dependency解决Spring Boot3不兼容 spring.factories rocketmq-spring-boot-starter:2.2.2版本中 参考配置文件 # RocketMQ 配置 rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group保证唯一group: event-mq-group# 发送消息超时时间默认3000sendMessageTimeout: 10000# 发送消息失败重试次数默认2retryTimesWhenSendFailed: 2# 异步消息重试此处默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker默认falseretryNextServer: false参考Issue 方法一 通过Import(RocketMQAutoConfiguration.class)在配置类中引入 方法二在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports。 文件内容为RocketMQ自动配置类路径org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration RocketMQ 使用 解决Spring Boot3不支持spring.factories的问题 import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import;/*** 启动类*/ Import(RocketMQAutoConfiguration.class) SpringBootApplication public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);} }RocketMQ操作工具 RocketMQ Message实体 import cn.hutool.core.util.IdUtil; import jakarta.validation.constraints.NotBlank; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ObjectUtils; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable; import java.util.List;/*** RocketMQ 消息*/ Data Builder AllArgsConstructor NoArgsConstructor public class RocketMQMessageT implements Serializable {/*** 消息队列主题*/NotBlank(message MQ Topic 不能为空)private String topic;/*** 延迟级别*/Builder.Defaultprivate DelayLevel delayLevel DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private ListT messages;/*** 使用有序消息发送时指定发送到队列*/private String hashKey;/*** 任务Id用于日志打印相关信息*/Builder.Defaultprivate String taskId IdUtil.fastSimpleUUID(); } RocketMQTemplate 二次封装 import com.yiyan.study.domain.RocketMQMessage; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/ Slf4j Component public class RocketMQService {Resourceprivate RocketMQTemplate rocketMQTemplate;Value(${rocketmq.producer.sendMessageTimeout})private int sendMessageTimeout;/*** 异步发送消息回调** param taskId 任务Id* param topic 消息主题* return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}], taskId, topic, sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {log.error(ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}], taskId, topic, throwable.getMessage());}};}/*** 发送同步消息使用有序发送请设置HashKey** param message 消息参数*/public T void syncSend(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info(ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** param message 消息参数*/public T void syncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info(ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}],message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息异步返回消息结果** param message 消息参数*/public T void asyncSend(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}], message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** param message 消息参数*/public T void asyncSendBatch(RocketMQMessageT message) {log.info(ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}],message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息不关心返回结果容易消息丢失适合日志收集、不精确统计等消息发送;** param message 消息参数*/public T void sendOneWay(RocketMQMessageT message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** param message 消息体* param batch 是否为批量操作*/public T void sendOneWay(RocketMQMessageT message, boolean batch) {log.info((batch ? ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]: ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg - rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}} }定义RocketMQ消费者 import com.yiyan.study.constants.MQConfig; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;/*** MQ消息监听*/ Component Slf4j RocketMQMessageListener(topic MQConfig.EVENT_TOPIC,consumerGroup MQConfig.EVENT_CONSUMER_GROUP) public class MQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(MQListener 接收消息 {}, message);} } 定义测试类发送消息 import cn.hutool.core.thread.ThreadUtil; import com.yiyan.study.constants.MQConfig; import com.yiyan.study.domain.RocketMQMessage; import com.yiyan.study.utils.RocketMQService; import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/ SpringBootTest public class MQTest {Resourceprivate RocketMQService rocketMQService;Testpublic void sendMessage() {int count 1;while (count 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);} }测试
http://www.w-s-a.com/news/19688/

相关文章:

  • 万维网网站个人申请网站
  • 我想做网站怎么做昆山网站建设 全是乱码
  • 单位做网站怎么做圣诞树html网页代码
  • 网页开发与网站开发企业网站托管服务常用指南
  • 一站式服务图片临沂做进销存网站
  • 鸣蝉智能建站标准物质网站建设模板
  • 电商网站建设技术员的工作职责商业网站制作价格
  • 网站html模板免费下载公司的网站建设费用入什么科目
  • 高中生做网站网页网页制作教程零基础学会
  • 做金融网站有哪些要求WordPress站内搜索代码
  • 济南网站怎么做seowordpress注册发邮件
  • 珠海网站设计平台东莞市手机网站建设平台
  • 网站开发文档合同怎么在wordpress导航条下方加入文字广告链接
  • 网站建设需怎么做有网站怎么做企业邮箱
  • 网站制作流程视频教程小程序多少钱一年
  • 暗网是什么网站花都网站建设哪家好
  • 贵州网站开发流程晋江论坛手机版
  • 网站建设丿金手指谷哥14阿里巴巴官网电脑版
  • 网站开发招聘信息匿名ip访问网站受限
  • 网站转app工具网站规划建设与管理维护大作业
  • flash是怎么做网站的.net购物网站开发
  • 烟台网站建设求职简历品质商城网站建设
  • 做百度外链哪些网站权重高点做网站具备的条件
  • 怎么样用ppt做网站红番茄 网站点评
  • 建设银行河北分行招聘网站哪里能找到网站
  • 兰州营销型网站网站建设收费标准
  • 网站首页动图怎么做自己做网站很难
  • 自建网站如何盈利推广引流最快的方法
  • 网页设计网站结构图怎么弄网站用户 分析
  • 企业手机网站建设策划天津网页设计工作