杭州建设企业网站的,前端性能优化,管网建设,网络推广公司运作RocketMQ 简单介绍 阿里云rockerMq 4.x和5.x集成
一、云平台创建实例
参考文档#xff1a; 阿里云api
阿里云 创建实例
二、skd集成思路
公司用的RocketMQ一般是自建开源apache的RocketMQ和上阿里云的RocketMQ#xff0c;目前阿里云支持4.x和5.x版本 项目集成思路…RocketMQ 简单介绍 阿里云rockerMq 4.x和5.x集成
一、云平台创建实例
参考文档 阿里云api
阿里云 创建实例
二、skd集成思路
公司用的RocketMQ一般是自建开源apache的RocketMQ和上阿里云的RocketMQ目前阿里云支持4.x和5.x版本 项目集成思路 1、集成阿里RocketMQ 两个版本 4.x和5.x 支持版本配置和开源apache的RocketMQ集成 2、RocketProperties 单配置适用多版本集成 3、RocketConsumer 消费者注解支持多版本集成(消费组监听器继承实现可以不一样注解一致 4、RocketMessageProducer 生产者接口支持多版本集成根据配置版本自动适配
注这个是一个持续集成优化过程需要不停磨练
三、根据RocketConsumer注解动态监听器实现
1、核心逻辑代码: 扫描包注解根据配置版本号走不同的消费组创建 String packageName properties.getPackageName(); // 扫描包路径ClassPathScanningCandidateComponentProvider scanner new ClassPathScanningCandidateComponentProvider(false);scanner.addIncludeFilter(new AnnotationTypeFilter(RocketConsumer.class));SetBeanDefinition candidates scanner.findCandidateComponents(packageName);for (BeanDefinition beanDefinition : candidates) {try {Integer versions properties.getVersions();if (Objects.equals(RockerMqVersions.ALI_4.getCode(), versions)) {onsRocketMQConsumerClient.start(beanDefinition);}if (Objects.equals(RockerMqVersions.ALI_5.getCode(), versions)) {rocketMQConsumerClient.start(beanDefinition);}} catch (Exception e) {e.printStackTrace();}}2、消费组push 用法示例 版本4.x 写法
Slf4j
Component
RocketConsumer(topic PRODUCER_TOPIC, consumerGroup PRODUCER_GROUP)
public class Push4MQConsumer implements MessageListener {Autowiredprivate UserService userService;Overridepublic Action consume(Message message, ConsumeContext consumeContext) {String body new String(message.getBody());log.info(TestMQConsumer: body user: userService.getUserId());return Action.CommitMessage;}
}版本5.x 写法
Slf4j
Component
RocketConsumer(topic PRODUCER_TOPIC, consumerGroup PRODUCER_GROUP)
public class TestMQConsumer implements MessageListener {Autowiredprivate UserService userService;Overridepublic ConsumeResult consume(MessageView messageView) {String body StandardCharsets.UTF_8.decode(messageView.getBody()).toString();log.info(TestMQConsumer: body user: userService.getUserId());return ConsumeResult.SUCCESS;}
}五、RocketMessageProducer 生产发送
目前只写5.x 写法,后面持续优化 普通消息
RocketMsg rocketMsg new RocketMsg();
rocketMsg.setBody(hxl测试发送);
rocketMessageProducer.sendMessage(PRODUCER_TOPIC, null, rocketMsg);
return ApiResult.success();延迟消息
RocketMsg rocketMsg new RocketMsg();
rocketMsg.setBody(延迟消息发送发送);
rocketMessageProducer.sendMessage(DELAY_TOPIC, null, 5 * 60L, rocketMsg);# github 地址
[https://github.com/sanxiaoshitou/tower-boot](https://github.com/sanxiaoshitou/tower-boot)