查看网站访问量,东莞 手机网站制作,湘潭做网站,wordpress json插件文章目录避免重复消费(保证消息幂等性)消息积压上线更多的消费者#xff0c;进行正常消费惰性队列消息缓存延时队列RabbitMQ如何保证消息的有序性#xff1f;RabbitMQ消息的可靠性、延时队列如何实现数据库与缓存数据一致#xff1f;开启消费者多线程消费避免重复消费(保证消…
文章目录避免重复消费(保证消息幂等性)消息积压上线更多的消费者进行正常消费惰性队列消息缓存延时队列RabbitMQ如何保证消息的有序性RabbitMQ消息的可靠性、延时队列如何实现数据库与缓存数据一致开启消费者多线程消费避免重复消费(保证消息幂等性) 方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) 每次消费消息之前根据消息id去判断该消息是否已消费过如果已经消费过则不处理这条消息否则正常消费消息并且进行入库操作。(消息全局ID作为数据库表的主键防止重复) 方式2: 利用Redis的setnx 命令给消息分配一个全局ID只要消费过该消息将 id,message以K-V键值对形式写入redis消费者开始消费前先去redis中查询有没消费记录即可 方式3: rabbitMQ的每一个消息都有redelivered字段可以获取是否是被重新投递过来的而不是第一次投递过来的 发送消息 Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息*/public void sendMessage() {// 创建消费对象并指定 全局唯一ID(这里使用UUID也可以根据业务规则生成只要保证全局唯一即可)MessageProperties messageProperties new MessageProperties ();messageProperties.setMessageId (UUID.randomUUID ().toString ());messageProperties.setContentType (text/plain);messageProperties.setContentEncoding (utf-8);Message message new Message (hello,message idempotent!.getBytes (), messageProperties);System.out.println (生产消息: message.toString ());rabbitTemplate.convertAndSend (EXCHANGE_NAME, ROUTE_KEY, message);}消费消息 /*** 消费消息** param message* param channel* throws IOException*/RabbitHandler//org.springframework.amqp.AmqpException: No method found for class [B 这个异常并且还无限循环抛出这个异常。//注意RabbitListener位置笔者踩坑无限报上面的错还有另外一种解决方案: 配置转换器RabbitListener(queues message_idempotent_queue)Transactionalpublic void handler(Message message, Channel channel) throws IOException {/*** 发送消息之前根据消息全局ID去数据库中查询该条消息是否已经消费过如果已经消费过则不再进行消费。*/// 获取消息IdString messageId message.getMessageProperties ().getMessageId ();if (StringUtils.isBlank (messageId)) {logger.info (获取消费ID为空);return;}MessageIdempotent messageIdempotent null;OptionalMessageIdempotent list messageIdempotentRepository.findById (messageId);if (list.isPresent ()) {messageIdempotent list.get ();}// 如果找不到则进行消费此消息if (null messageIdempotent) {//获取消费内容String msg new String (message.getBody (), StandardCharsets.UTF_8);logger.info (-----获取生产者消息-------------------- messageId: messageId ,消息内容: msg);//手动ACKchannel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);//存入到表中标识该消息已消费MessageIdempotent idempotent new MessageIdempotent ();idempotent.setMessageId (messageId);idempotent.setMessageContent (msg);messageIdempotentRepository.save (idempotent);} else {//如果根据消息ID(作为主键)查询出有已经消费过的消息那么则不进行消费logger.error (该消息已消费无须重复消费);}}消息积压 上线更多的消费者进行正常消费
线上突发问题要临时扩容增加消费端的数量 考虑到消费者的处理能力增加配置 spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息simple代表简单队列模型
惰性队列 //基于Bean声明lazy-queueBeanpublic Queue lazyQueue() {return QueueBuilder.durable(lazy.queue).lazy() //开启x-queue-mode为lazy.build();}//基于RabbitListener声明LazyQueueRabbitListener(queuesToDeclare {Queue(name lazy.queue,durable true,arguments Argument(name x-queue-mode, value lazy))})public void listenLazyQueue(String msg) {System.out.println(接收到lazy.queue的消息【 msg 】);} 惰性队列的优点有哪些 基于磁盘存储消息上限高没有间歇性的page-out性能比较稳定
消息缓存
使用Redis的List或ZSET做接收消息缓存写一个程序 按照消费者处理时间定时从Redis取消息发送到MQ
延时队列
设置消息过期时间过期后转入死信队列写一个程序 处理死信消息重新如队列或者 即使处理或记录到数据库延后处理 RabbitMQ如何保证消息的有序性
RabbitMQ是队列存储天然具备先进先出的特点只要消息的发送是有序的那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时可能出现消息轮询投递给消费者的情况而消费者的处理顺序就无法保证 因此要保证消息的有序性需要做的下面几点 保证消息发送的有序性保证一组有序的消息都发送到同一个队列保证一个队列只包含一个消费者 这样也会造成吞吐量下降可以在消费者内部采用多线程的方式消费 RabbitMQ消息的可靠性、延时队列
RabbitMQ消息可靠性、延时队列
如何实现数据库与缓存数据一致
实现方案有下面几种
本地缓存同步当前微服务的数据库数据与缓存数据同步可以直接在数据库修改时加入对Redis的修改逻辑保证一致。跨服务缓存同步服务A调用了服务B并对查询结果缓存。服务B数据库修改可以通过MQ通知服务A服务A修改Redis缓存数据通用方案使用Canal框架伪装成MySQL的salve节点监听MySQL的binLog变化然后修改Redis缓存数据
开启消费者多线程消费
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Slf4j
Component
public class SpringRabbitListener {/*** RabbitListener:加了该注解的方法表示该方法是一个消费者 concurrency并发数量。* 其他属性和注解想了解的话自己按Ctrl点进去看*/RabbitListener(bindings QueueBinding(value Queue(value Queue1),exchange Exchange(value Exchange1),key key1),concurrency 10)public void process1(Message message) throws Exception {System.out.println(Queue1: new String(message.getBody()));}}import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;Configuration
public class RabbitmqConfig {Bean(batchQueueRabbitListenerContainerFactory)public RabbitListenerContainerFactory? rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory ();factory.setConnectionFactory (connectionFactory);factory.setMessageConverter (new Jackson2JsonMessageConverter ());//确认方式,manual为手动ack.factory.setAcknowledgeMode (AcknowledgeMode.MANUAL);//每次处理数据数量提高并发量//factory.setPrefetchCount(250);//设置线程数//factory.setConcurrentConsumers(30);//最大线程数//factory.setMaxConcurrentConsumers(50);/* setConnectionFactory设置spring-amqp的ConnectionFactory。 */factory.setConnectionFactory (connectionFactory);factory.setConcurrentConsumers (1);factory.setPrefetchCount (1);//factory.setDefaultRequeueRejected(true);//使用自定义线程池来启动消费者。factory.setTaskExecutor (taskExecutor ());return factory;}Bean(correctTaskExecutor)Primarypublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor ();// 设置核心线程数executor.setCorePoolSize (100);// 设置最大线程数executor.setMaxPoolSize (100);// 设置队列容量executor.setQueueCapacity (0);// 设置线程活跃时间秒executor.setKeepAliveSeconds (300);// 设置默认线程名称executor.setThreadNamePrefix (thread-file-queue);// 设置拒绝策略rejection-policy当pool已经达到max size的时候丢弃// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown (true);return executor;}
}