榆树市住房和城乡建设局网站,上海发布公众号下载安装,集团网站品牌建设特点,快速建站哪里好一. 简介
1 什么是MQ
消息队列#xff08;Message Queue#xff0c;简称MQ#xff09;#xff0c;从字面意思上看#xff0c;本质是个队列#xff0c;FIFO先入先出#xff0c;只不过队列中存放的内容是message而已。 其主要用途#xff1a;不同进程Process/线程T…一. 简介
1 什么是MQ
消息队列Message Queue简称MQ从字面意思上看本质是个队列FIFO先入先出只不过队列中存放的内容是message而已。 其主要用途不同进程Process/线程Thread之间通信。
那么为什么会产生消息队列呢有几个原因 不同进程process之间传递消息时两个进程之间耦合程度过高改动一个进程引发必须修改另一个进程为了隔离这两个进程在两进程间抽离出一层一个模块所有两进程之间传递的消息都必须通过消息队列来传递单独修改某一个进程不会影响另一个 不同进程process之间传递消息时为了实现标准化将消息的格式规范化了并且某一个进程接受的消息太多一下子无法处理完并且也有先后顺序必须对收到的消息进行排队因此诞生了事实上的消息队列
MQ框架非常之多比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
2 什么是RabbitMQ
RabbitMQ 是一个消息代理它接受和转发消息。您可以将其视为邮局当您将要寄的邮件放入邮箱时您可以确信信使最终会将邮件发送给您的收件人。在本例中RabbitMQ 是邮箱、邮局和信使。
RabbitMQ 与邮局的主要区别在于它不处理纸张而是接受、存储和转发二进制数据块——消息。
RabbitMQ 和一般意义上的消息传递使用了一些术语。 生产_仅仅意味着发送。发送消息的程序称为_生产者 _队列_是 RabbitMQ 中邮箱的名称。虽然消息会流经 RabbitMQ 和您的应用程序但它们只能存储在_队列_中。_队列_仅受主机内存和磁盘限制的约束它本质上是一个大型消息缓冲区。许多_生产者_可以发送消息到一个队列并且许多_消费者_可以尝试从一个_队列_中接收数据。这就是我们表示队列的方式
其是实现 AMQP高级消息队列协议的消息中间件的一种最初起源于金融系统用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时消费者无法快速消费那么需要一个中间层。保存这个数据。 AMQP即 Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。AMQP 的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。 RabbitMQ 是一个开源的 AMQP 实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等支持 AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 3 相关概念
通常我们谈到队列服务会有三个概念发消息者、队列、收消息者RabbitMQ 在这个基本概念之上多做了一层抽象在发消息者和队列之间加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系转而变成发消息者把消息给交换器交换器根据调度策略再把消息给队列。那么其中比较重要的概念有4个分别为虚拟主机交换机队列和绑定。
虚拟主机一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢很简单 RabbitMQ 当中用户只能在虚拟主机的 粒度进行权限控制。 因此如果需要禁止A组访问B组的交换机/队列/绑定必须为A和B分别创建一个虚拟主机。每一个RabbitMQ 服务器 都有一个默认的虚拟主机“/”。交换机Exchange 用于转发消息但是它不会做存储 如果没有 Queue bind 到 Exchange 的话它会直接丢弃掉 Producer 发送过来的 消息。这里有一个比较重要的概念路由键。消息到交换机的时候交互机会转发到对应的队列中那么究竟转发到哪个队列就要根据 该路由键。绑定也就是交换机需要和队列相绑定这其中如上图所示是多对多的关系。
二. 实现
Spring Boot 集成 RabbitMQ
Spring Boot 集成 RabbitMQ 非常简单如果只是简单的使用配置非常少Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。
1. 简单使用
1配置 pom 包主要是添加 spring-boot-starter-amqp 的支持 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2配置文件application.yml
配置 RabbitMQ 的安装地址、端口以及账户信息
# 配置文件
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicodetruecharacterEncodingutf-8useSSLfalseusername: rootpassword: 123456# RabbitMQ配置rabbitmq:host: 192.168.146.1port: 5672username: adminpassword: 123456我这里还配置了数据库
3队列配置
package com.nianxi.mybatisplus.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Beanpublic Queue Queue() {return new Queue(hello);}
}4发送者
rabbitTemplate 是 Spring Boot 提供的默认实现
package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;Component
public class HelloSender {Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context hello new Date();System.out.println(Sender : context);this.rabbitTemplate.convertAndSend(hello, context);}
}5接收者
package com.nianxi.mybatisplus.mapper;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
RabbitListener(queues hello)
public class HelloReceiver {RabbitHandlerpublic void process(String hello) {System.out.println(Receiver : hello);}
}6 测试
package com.nianxi.mybatisplus;import com.nianxi.mybatisplus.mapper.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMqHelloTest {Autowiredprivate HelloSender helloSender;Testpublic void hello() throws Exception {helloSender.send();}
}注意发送者和接收者的 queue name 必须一致不然不能接收 2.RabbitTemplate
**RabbitTemplate**是SpringAMQP提供的一个高级消息操作模板**用于在与RabbitMQ进行交互时进行消息的发送和接收操作。**它是对底层AMQP协议的封装简化了与RabbitMQ的交互过程, 是SpringAMQP中的核心类提供声明式方式处理RabbitMQ包括发送和接收消息、消息转换、属性设置及回调机制。通过配置和正确使用简化了RabbitMQ的集成与操作。
1 发送消息
**RabbitTemplate**提供了多种发送消息的方法包括同步发送和异步发送。通过指定交换机、路由键和消息体我们可以将消息发送到 RabbitMQ 服务器上的指定位置。此外RabbitTemplate还支持消息的确认机制以确保消息被成功发送和接收。
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);2 接收消息
除了发送消息外**RabbitTemplate**还提供了接收消息的功能。通过调用相关方法我们可以从指定的队列中接收消息并进行相应的处理。这通常涉及到监听队列、处理消息和确认消息接收等步骤。
Message receivedMessage rabbitTemplate.receive(queueName);
MyMessage myMessage rabbitTemplate.receiveAndConvert(queueName, MyMessage.class);3 消息转换
**RabbitTemplate支持消息的自动转换。这意味着我们可以将 Java 对象作为消息体发送而RabbitTemplate会自动将其转换为可序列化的格式如 JSON 或 XML。同样地当从队列中接收消息时RabbitTemplate**也可以自动将消息体转换回 Java 对象。
Jackson2JsonMessageConverter messageConverter new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(messageConverter);4 消息属性设置
在发送消息时我们可以设置各种消息属性如消息的优先级、持久化标志、过期时间等。这些属性可以通过**MessageProperties对象进行设置并在发送消息时传递给RabbitTemplate**。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; Service
public class MessageSender { Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) { // 创建MessageProperties MessageProperties properties new MessageProperties(); // 设置优先级值范围0-9其中0为最低优先级9为最高优先级 properties.setPriority(priority); // 设置消息持久化 properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); // 设置消息的过期时间单位为毫秒 properties.setExpiration(String.valueOf(ttl)); // 使用MessageBuilder构建Message对象 Message msg MessageBuilder.withBody(message.getBytes()) .setContentEncoding(UTF-8) .setContentType(text/plain) .setMessageId(UUID.randomUUID().toString()) // 可选设置消息ID .setTimestamp(new Date()) // 可选设置时间戳 .setHeaders(Collections.singletonMap(x-custom-header, value)) // 可选设置自定义头 .andProperties(properties) .build(); // 发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, msg); }
}5 回调机制
**RabbitTemplate**支持发送消息时的回调机制。这意味着在发送消息后我们可以注册一个回调函数来处理发送结果或接收响应。这对于需要异步处理发送结果或接收响应的场景非常有用。
**setConfirmCallback方法是RabbitTemplate**类中的一个回调方法用于处理消息的确认acknowledgment结果。当消息成功发送到RabbitMQ的交换机时会触发确认回调你可以在该回调中处理相应的逻辑。 correlationData关联数据可以是任意类型的对象通常用于唯一标识消息。 ack布尔值表示消息是否成功发送到交换机。true表示成功false表示失败。 cause失败的原因当ack为false时此参数会提供一个可选的异常信息。 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - { if (ack) { // 消息发送成功 System.out.println(“Message sent successfully”); } else { // 消息发送失败进行处理 System.out.println(Message sent failed: cause); } });
6 异步消息处理
RabbitTemplate支持异步消息处理你可以注册ConfirmCallback和ReturnCallback来处理消息的确认和返回结果。ConfirmCallback用于确认消息是否成功发送到交换机ReturnCallback用于处理无法路由到队列的消息。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {// 消息发送成功} else {// 消息发送失败进行处理}
});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 处理无法路由到队列的消息
});3.使用 RabbitTemplate 的步骤
1 配置 RabbitTemplate
在使用**RabbitTemplate**之前我们需要对其进行配置。这通常涉及到设置连接工厂、交换机、队列和绑定等。这些配置可以通过 XML 配置或 Java 配置完成。
2 创建 RabbitTemplate 实例
一旦配置完成我们可以创建一个**RabbitTemplate**实例。这个实例将使用我们提供的配置来与 RabbitMQ 服务器进行交互。
3 发送消息
使用**RabbitTemplate**的发送方法我们可以将消息发送到指定的交换机和路由键。我们可以指定消息体、消息属性和其他发送选项。
4 接收消息
要接收消息我们可以使用**RabbitTemplate**的接收方法或结合监听器来监听指定的队列。当消息到达时我们可以处理消息并执行相应的业务逻辑。
5 处理异常和错误
在使用**RabbitTemplate**时我们还需要考虑异常和错误处理。例如当发送消息失败或接收消息时发生异常时我们需要有相应的处理机制来确保系统的稳定性和可靠性。
4.RabbitTemplate 的优势与注意事项
优势
简化操作RabbitTemplate封装了底层细节使得开发者能够专注于业务逻辑的实现而无需关心底层的消息传输细节。灵活性RabbitTemplate提供了丰富的配置选项和扩展点使得开发者能够根据实际需求进行定制和优化。性能优化RabbitTemplate内部进行了性能优化如连接池管理、消息缓存等以提高消息传输的效率和可靠性。
注意事项
配置正确性确保RabbitTemplate的配置正确无误包括连接工厂、交换机、队列和绑定等的设置。错误的配置可能导致消息无法正确发送或接收。异常处理在使用RabbitTemplate时要充分考虑异常处理机制确保在发生异常时能够及时发现并处理。资源释放在使用完RabbitTemplate后要确保释放相关资源如关闭连接、释放连接池中的连接等以避免资源泄漏和性能问题。