做网站 珠海,sem工作原理,昆山做网站,阿里云 wordpress 权限设置文章目录 前言1、application.yml2、RabbitMqConfig3、MqMessage4、MqMessageItem5、DirectMode6、StateConsumer#xff1a;消费者7、InfoConsumer#xff1a;消费者 前言
本文是工作之余的随手记#xff0c;记录在工作期间使用 RabbitMQ 的笔记。
1、application.yml
使… 文章目录 前言1、application.yml2、RabbitMqConfig3、MqMessage4、MqMessageItem5、DirectMode6、StateConsumer消费者7、InfoConsumer消费者 前言
本文是工作之余的随手记记录在工作期间使用 RabbitMQ 的笔记。
1、application.yml
使用 use 属性方便随时打开和关闭使用 MQ 并且可以做到细化控制。
spring:rabbitmq:use: truehost: 10.100.10.100port: 5672username: wenpassword: 123456exchangeSubPush: exWenqueueSubPush: ha.queue.SubPushrouteSubPush: 1000exchangeState: sync.ex.StatequeueState: ha.q.ServerqueueStateSync: ha.q.StateServerrouteState: stateexchangeOnlineMonitor: sync.ex.StaterouteOnlineMonitor: statequeueOnlineMonitor: ha.q.Onlinepom.xml 文件中使用的是 SpringBoot 项目使用 spring-boot-starter-amqp 依赖。
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.wen/groupIdartifactIdspringboot-mybatis/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesparentartifactIdspring-boot-starter-parent/artifactIdgroupIdorg.springframework.boot/groupIdversion2.5.3/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-tomcat/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.18/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.1/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.18/version/dependency/dependencies
/project2、RabbitMqConfig
配置类将可配置的参数使用 Value 做好配置与 application.yml 相互对应。
package com.wen.mq;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;Slf4j
Configuration
Data
public class RabbitMqConfig {Value(${spring.rabbitmq.use:true})private boolean use;Value(${spring.rabbitmq.host})private String host;Value(${spring.rabbitmq.port})private int port;Value(${spring.rabbitmq.username})private String username;Value(${spring.rabbitmq.password})private String password;Value(${spring.rabbitmq.virtual-host:})private String virtualHost;Value(${spring.rabbitmq.exchangeState})private String exchangeState;Value(${spring.rabbitmq.queueState})private String queueState;Value(${spring.rabbitmq.routeState})private String routeState;Value((${spring.rabbitmq.queueStateSync}))private String queueStateSync;Value(${spring.rabbitmq.exchangeOnlineInfo})private String exchangeOnlineInfo;Value(${spring.rabbitmq.routeOnlineInfo})private String routeOnlineInfo;Value(${spring.rabbitmq.queueOnlineInfo})private String queueOnlineInfo;PostConstructprivate void init() {}
}3、MqMessage
MQ 消息实体类
package com.wen.mq;import lombok.Data;Data
public class MqMessageT {private String msgType;private String msgOrigin;private long time;private T data;}4、MqMessageItem
MQ 消息实体类
package com.wen.mq;import lombok.Data;Data
public class MqMessageItem {private long userId;private String userName;private int userAge;private String userSex;private String userPhone;private String op;}5、DirectMode
配置中心使用 SimpleMessageListenerContainer 进行配置。新加一个消费者队列就要在这里进行配置。
package com.wen.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Slf4j
Configuration
public class DirectMode {AutowiredRabbitMqConfig rabbitMqConfig;Autowiredprivate CachingConnectionFactory connectionFactory;Autowiredprivate StateConsumer stateConsumer;Autowiredprivate InfoConsumer infoConsumer;Beanpublic SimpleMessageListenerContainer initMQ() {if (!rabbitMqConfig.isUse()) {return null;}log.info(begin!);SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认// 设置一个队列container.setQueueNames(rabbitMqConfig.getQueueStateSync());//如果同时设置多个队列如下 前提是队列都是必须已经创建存在的//container.setQueueNames(TestDirectQueue,TestDirectQueue2,TestDirectQueue3”);//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue(TestDirectQueue,true));//container.addQueues(new Queue(TestDirectQueue2,true));//container.addQueues(new Queue(TestDirectQueue3,true));container.setMessageListener(stateConsumer);log.info(end);return container;}Beanpublic SimpleMessageListenerContainer contactSyncContainer() {if (!rabbitMqConfig.isUse()) {return null;}log.info(contact begin);SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认消息//设置一个队列container.setQueueNames(rabbitMqConfig.getQueueOnlineInfo());container.setMessageListener(infoConsumer);log.info(contact end);return container;}Beanpublic Queue queueState() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueState());}Beanpublic Queue queueStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueStateSync());}BeanDirectExchange exchangeState() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeState());}BeanBinding bindingState() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueState()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}BeanBinding bindingStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueStateSync()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}// 新加一个消费者Beanpublic Queue queueOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueOnlineInfo());}BeanDirectExchange exchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeOnlineInfo());}BeanBinding bindingExchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueOnlineMonitor()).to(exchangeOnlineMonitor()).with(rabbitMqConfig.getRouteOnlineInfo());}
}6、StateConsumer消费者
实现 ChannelAwareMessageListener 接口可以在这里面做相应的操作例如存缓存存库等。
package com.wen.mq;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;Slf4j
Component
public class StateConsumer implements ChannelAwareMessageListener {AutowiredRabbitMqConfig rabbitMqConfig;Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName message.getMessageProperties().getConsumerQueue();long deliveryTag message.getMessageProperties().getDeliveryTag();if (!rabbitMqConfig.getQueueStateSync().equals(queueName)) {String bodyStr new String(message.getBody(), StandardCharsets.UTF_8);try {MqMessageListMqMessageItem mqMessage JSON.parseObject(bodyStr, new TypeReferenceMqMessageListMqMessageItem() {});// 这里可以对消息做其他处理例如存储到缓存中ListMqMessageItem items mqMessage.getData();if (CollectionUtil.isNotEmpty(items)) {applyToRedis(mqMessage);}log.info(consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}, queueName, deliveryTag, mqMessage);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error(parse mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, false);} catch (Exception e) {log.error(consume mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}}}public static final String MQ_STATE_OP_REMOVE_STATE REMOVE_STATE;public static final String MQ_STATE_OP_CHANGE_STATE CHANGE_STATE;private void applyToRedis(MqMessageListMqMessageItem mqMessage) {ListMqMessageItem data mqMessage.getData();MapString, ListMqMessageItem itemGroupByOp data.stream().collect(Collectors.groupingBy(item - item.getOp()));ListMqMessageItem stateToRemove itemGroupByOp.get(MQ_STATE_OP_REMOVE_STATE);ListMqMessageItem stateToChange itemGroupByOp.get(MQ_STATE_OP_CHANGE_STATE);if (CollectionUtil.isNotEmpty(stateToRemove)) {MapLong, SetString map new HashMap();for (MqMessageItem item : stateToRemove) {map.computeIfAbsent(item.getUserId(), u - new HashSet()).add(String.valueOf(item.getUserAge()));}// cacheService.removeUserState(map);}if (CollectionUtil.isNotEmpty(stateToChange)) {ListMqMessageItem list stateToChange.stream().map(u - {MqMessageItem dto new MqMessageItem();dto.setUserId(u.getUserId());dto.setUserAge(u.getUserAge());dto.setUserName(u.getUserName());dto.setUserSex(u.getUserSex());dto.setUserPhone(u.getUserPhone());return dto;}).collect(Collectors.toList());// cacheService.saveUserState(list);}}
}7、InfoConsumer消费者
实现 ChannelAwareMessageListener 接口可以在这里面做相应的操作例如存缓存存库等。
package com.wen.mq;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Slf4j
Component
public class InfoConsumer implements ChannelAwareMessageListener {AutowiredRabbitMqConfig rabbitMqConfig;Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName message.getMessageProperties().getConsumerQueue();log.info(queueName: {}, queueName);long deliveryTag message.getMessageProperties().getDeliveryTag();try {byte[] body message.getBody();String content new String(body);MqMessage msg JSONObject.parseObject(content, MqMessage.class);if (rabbitMqConfig.getQueueOnlineInfo().equals(queueName)) {// 订阅到的消息就是变更的消息// 这里可使用service对消息进行消费返回一个booleanlog.info(用户监控数据写入失败数据{}, msg);}log.info(consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}, queueName, deliveryTag, msg);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error(parse mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, false); //为true会重新放回队列} catch (Exception e) {log.error(consume mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}}
}