如何查网站空间,活动网页怎么做,南昌网站建设制作公司,线上推广方案一 前言
RocketMQ 作为一个功能强大的消息队列系统#xff0c;不仅支持基本的消息发布与订阅#xff0c;还提供了顺序消息、延时消息、事务消息等高级功能#xff0c;适应了复杂的分布式系统需求。其高可用性架构、多副本机制、完善的运维管理工具#xff0c;以及安全控制…一 前言
RocketMQ 作为一个功能强大的消息队列系统不仅支持基本的消息发布与订阅还提供了顺序消息、延时消息、事务消息等高级功能适应了复杂的分布式系统需求。其高可用性架构、多副本机制、完善的运维管理工具以及安全控制功能使其成为企业级应用的首选消息中间件。 在Android应用中你可以使用RocketMQ的客户端库来发送和接收消息.
二 接入流程
1 添加依赖
在Android项目的build.gradle文件中添加RocketMQ客户端库的依赖。
dependencies {implementation org.apache.rocketmq:rocketmq-client:5.3.1
}2 添加权限
uses-permission android:nameandroid.permission.INTERNET /3 接收消息
ExecutorService executor Executors.newFixedThreadPool(20); //根据项目需要设置常用线程个数
String TAG MainActivity;
String GROUP producer;
String ADDRESS 192.168.1.84:9876;
String KEY key;executor.submit(() - {try {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(GROUP);consumer.setNamesrvAddr(ADDRESS);// 订阅 topic 下的全部 tabconsumer.subscribe(TOPIC, *);// BROADCASTING:广播模式,把消息发给了所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组, CLUSTERING:集群模式(默认值),每一条消息只会被同一个消费者组中的一个实例消费consumer.setMessageModel(MessageModel.CLUSTERING);// CONSUME_FROM_LAST_OFFSET:从最新的偏移值开始消费(默认值), CONSUME_FROM_FIRST_OFFSET:从队列最开始的偏移值开始消费, CONSUME_FROM_TIMESTAMP:从指定的时间戳处开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// yyyyMMddHHmmss: 当选择从指定的时间戳处开始消费时, 需要指定该时间戳// consumer.setConsumeTimestamp();// 使用并发方式从多个MessageQueue中取数据的方式监听consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - {System.out.println();for (MessageExt msg : msgs) {Log.e(TAG,收到消息new String(msg.getBody()));}// 返回消费成功, 还可以是 RECONSUME_LATER:稍后重新消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();TimeUnit.DAYS.sleep(1);} catch (Throwable cause) {cause.printStackTrace();}});4 发送消息 executor.submit(() - {try {DefaultMQProducer producer new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();// 同步传递消息消息会发给集群中的一个Broker节点。Message message new Message(TOPIC, TAG, KEY, android hello word ss.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult result producer.send(message);Log.e(TAG,发送消息结果:result JSON.toJSONString(result));producer.shutdown();} catch (Exception e) {Log.e(TAG,发送失败e.getCause().toString());e.printStackTrace();}});三 问题
启动项目,点击发送消息,项目报了异常信息,如下
java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory; 报错这是因为RocketMQ客户端库依赖于Java标准库中的 java.lang.management.ManagementFactory 类而Android并不完全支持Java标准库尤其是 java.lang.management 包。
RocketMQ官方没有专门为Android提供适配版本所以可以尝试使用这些版本或者自己修改RocketMQ源码移除对 ManagementFactory 的依赖。
四 修改源码
在github中,把rocketmq-client源码下载到本地 https://github.com/apache/rocketmq
导入到本地如下 然后找到前面ManagementFactory 报错的地方,将它移除或者用其他方法代替,经排查在 org.apache.rocketmq.common.UtilAll 有相关的引用 该方法则是为了通过获取jvm的进程ID,这边我们可以把它注释掉,然后用个固定值代替试下 static {HEX_ARRAY 0123456789ABCDEF.toCharArray();/* SupplierInteger supplier () - {// format: pidhostnameString currentJVM ManagementFactory.getRuntimeMXBean().getName();try {return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf()));} catch (Exception e) {return -1;}};PID supplier.get();*/PID 888888;}以及在org.apache.rocketmq.common.MixAll也有ManagementFactory相关引用,这个作用是获取当前java虚拟机(JVM)的进程ID,可以将其注释,然后返回固定的结果
public static long getPID() {String processName java.lang.management.ManagementFactory.getRuntimeMXBean().getName();if (StringUtils.isNotEmpty(processName)) {try {return Long.parseLong(processName.split()[0]);} catch (Exception e) {return 0;}}return 0;}最后还有一个地方有涉及到,在包路径org.apache.rocketmq.store.StoreUtil,其作用是为了获取当前机器的总物理内存大小(以字节为单位) public static long getTotalPhysicalMemorySize() {long physicalTotal 1024 * 1024 * 1024 * 24L;OperatingSystemMXBean osmxb ManagementFactory.getOperatingSystemMXBean();if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {physicalTotal ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();}return physicalTotal;}将相关的包修改后,然后将其重新打包,在maven工具下,选择rocketmq-common,选择Plugins下的jar组件,选中下面的jar进行打包 打包完成后,在模块的target目录下生成jar包 android需要用到的包如下:
implementation files(libs\\rocketmq-remoting-5.3.1.jar)
implementation files(libs\\rocketmq-client-5.3.1.jar)
implementation files(libs\\rocketmq-common-5.3.1.jar)implementation io.github.aliyunmq:rocketmq-logback-classic:1.0.1
implementation com.google.guava:guava:31.1-jre
implementation commons-validator:commons-validator:1.7将模块rocketmq-remoting,rocketmq-client,rocketmq-commo三个模块重新打包后导入,然后再加上下面那三个相关联的依赖包.重新用android应用进行收发信息,测试如下:
2025-03-04 14:51:22.272 13676-13795/? E/MainActivity: 收到消息android hello word ss
2025-03-04 14:51:22.279 13676-13785/? E/MainActivity: 发送消息结果:result{messageQueue:{brokerName:broker-a,queueId:0,topic:TopicTestLss},msgId:C10005FD90380CA347BF12A326F00000,offsetMsgId:C2000B5400002A9F000000000007A575,queueOffset:4,regionId:DefaultRegion,sendStatus:SEND_OK,traceOn:true,transactionId:C10005FD90380CA347BF12A326F00000}