个人网站命名,wordpress 系统安装教程 pdf,360网页版登录入口,视频链接提取下载RabbitMQ 是什么#xff1f;
RabbitMQ是一个遵循AMQP协议的消息中间件#xff0c;它从生产者接收消息并传递给消费者#xff0c;在这个过程中#xff0c;根据路由规则进行消息的路由、缓存和持久化。
AMQP#xff0c;高级消息队列协议#xff0c;是应用层协议的一个开放…RabbitMQ 是什么
RabbitMQ是一个遵循AMQP协议的消息中间件它从生产者接收消息并传递给消费者在这个过程中根据路由规则进行消息的路由、缓存和持久化。
AMQP高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件而设计的。基于此协议的客户端与消息中间件可以传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。RabbitMQ就是通过Erlang语言实现的一种消息中间件。
具备下面的核心功能
异步通信允许应用程序通过消息队列解耦生产者发送消息后无需等待消费者处理。消息路由通过灵活的交换器Exchange机制支持多种消息分发模式。可靠性保障提供消息持久化、确认机制ACK和重试策略确保消息不丢失。负载均衡通过轮询或权重分配方式将消息分发给多个消费者提升系统吞吐量。
为什么需要消息队列
在分布式系统中直接调用如 HTTP 请求可能导致以下问题
耦合性高服务之间依赖性强一个服务故障可能引发雪崩效应。性能瓶颈同步调用会阻塞线程影响系统响应速度。扩展困难高并发场景下难以动态调整消费者数量。
消息队列通过异步通信和缓冲机制解决了这些问题生产者发送消息到队列后即可返回消费者按自身能力处理消息。即使消费者暂时不可用消息仍能存储在队列中避免数据丢失。
核心概念 名称 说明 Producer 生产者发送消息的一方 Consumer 消费者接收消息的一方 Queue 队列存储消息的缓冲区 Exchange 交换机负责转发消息到队列 Routing Key 路由键决定消息如何路由 Binding 绑定连接交换机与队列的规则 Message 消息最终传输的数据
工作模型 模型类型英文 中文名称 简介 Simple 简单队列模型 一个生产者对应一个队列和一个消费者最基础的模型适合入门学习或简单通信。 Work Queue 工作队列模型 一个生产者将任务发送到队列由多个消费者竞争消费常用于任务分发和后台处理。 Publish/Subscribe 发布/订阅模型 通过 fanout 类型交换机生产者发送的消息会广播到所有绑定的队列适合通知、广播类场景。 Routing 路由模型 使用 direct 类型交换机生产者根据路由键将消息精确投递到指定队列适合日志分级处理等场景。 Topics 主题通配路由模型 使用 topic 类型交换机支持模糊匹配路由键如 user.*.email适合复杂业务订阅场景。 RPC 模式 远程调用模型 实现远程服务调用生产者发送请求并等待消费者返回响应适合系统之间的异步调用场景。 Docker安装RabbitMQ
1、拉取RabbitMQ镜像
命令
docker pull rabbitmq 2、启动
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq 3、进入容器内部
docker exec -it rabbit /bin/bash 4、安装插件
rabbitmq-plugins enable rabbitmq_management 5、查看插件情况
rabbitmq-plugins list 6、访问RabbitMQ
http://192.168.142.3:15672/
账号guest
密码guest RabbitMQ安装方式解压安装
1、下载RabbitMQ
Installing RabbitMQ | RabbitMQ 2、下载Erlang
RabbitMQ是采用 Erlang语言开发的所以系统环境必须提供 Erlang环境需要是安装 Erlang
Erlang和RabbitMQ版本对照https://www.rabbitmq.com/which-erlang.html 这里安装最新版本3.8.14的RabbitMQ对应的Erlang版本推荐23.x
下载地址el/7/erlang-23.2.7-2.el7.x86_64.rpm - rabbitmq/erlang · packagecloud 3、将下载好的文件上传到服务器
# 创建文件
mkdir -p /opt/rabbitmq
将安装包上传到/opt/rabbitmq 4、安装Erlang
cd /opt/rabbitmq
# 解压
rpm -Uvh erlang-23.2.7-2.el7.x86_64.rpm# 安装
yum install -y erlang 如果yum无法使用
使用国内镜像源适用于中国用户
备份原有 repo 文件
sudo mkdir /etc/yum.repos.d/backup
sudo mv /etc/yum.repos.d/CentOS-* /etc/yum.repos.d/backup/
下载阿里云镜像源
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
清理并重建缓存
sudo yum clean all
sudo yum makecache 安装完成后输入如下指令查看版本号输入两次ctrlc退出 5、安装RabbitMQ
在RabiitMQ安装过程中需要依赖socat插件首先安装该插件
yum install -y socat 解压安装RabbitMQ的安装包
# 解压
rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm
# 安装
yum install -y rabbitmq-server 6、启动RabbitMQ服务
# 启动rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq状态
systemctl status rabbitmq-server# 其他命令
# 设置rabbitmq服务开机自启动
systemctl enable rabbitmq-server
# 关闭rabbitmq服务
systemctl stop rabbitmq-server
# 重启rabbitmq服务
systemctl restart rabbitmq-server 7、RabbitMQWeb管理界面及授权操作
systemctl stop firewalld
# 打开RabbitMQWeb管理界面插件
rabbitmq-plugins enable rabbitmq_management 打开浏览器访问服务器公网ip:15672 http://192.168.142.131:15672/
rabbitmq有一个默认的账号密码guest 添加远程用户
# 添加用户
rabbitmqctl add_user 用户名 密码
rabbitmqctl add_user admin 123456
# 设置用户角色,分配操作权限
rabbitmqctl set_user_tags 用户名 角色
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
rabbitmqctl set_permissions -p / 用户名 .* .* .*
rabbitmqctl set_permissions -p / admin .* .* .*#其他指令
# 修改密码
rabbitmqctl change_ password 用户名 新密码
# 删除用户
rabbitmqctl delete_user 用户名
# 查看用户清单
rabbitmqctl list_users
角色有四种
administrator可以登录控制台、查看所有信息、并对rabbitmq进行管理monToring监控者登录控制台查看所有信息policymaker策略制定者登录控制台指定策略managment普通管理员登录控制
创建用户admin密码123456设置administrator角色赋予所有权限 然后访问 http://192.168.142.131:15672/ 用户名admin 密码123456 8、延时队列插件安装
Community Plugins | RabbitMQRabbitMQ是什么版本的下载的插件就得是什么版本的 将插件上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange 然后重启rabbitMQ
SpringBoot如何使用RabbitMQ
1、创建SpringBoot项目引入依赖
!-- Spring Boot Starter for RabbitMQ --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
2、在 application.yml 中添加 RabbitMQ 配置
spring:# RabbitMQ 相关配置rabbitmq:# RabbitMQ 服务器地址host: 192.168.142.131# RabbitMQ 服务器端口 客户端应用程序(生产者/消费者)通过这个端口与 RabbitMQ 服务器交互port: 5672# RabbitMQ 用户名username: admin# RabbitMQ 密码password: 123456# RabbitMQ 虚拟主机virtual-host: /# 开启发送方确认机制publisher-confirm-type: correlated# 开启发送方退回机制publisher-returns: true# RabbitMQ 模板配置template:# 设置为 true 时RabbitMQ 将确认消息是否成功投递到队列mandatory: true# 消息监听器配置listener:simple:# 消费者最小数量concurrency: 5# 消费者最大数量max-concurrency: 10# 每次从队列中获取的消息数量prefetch: 1# 消费者手动 ackacknowledge-mode: manual
3、创建 RabbitMQ 配置类
package com.lw.mqdemo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ配置类用于声明队列、交换机、绑定关系以及配置消息转换器和RabbitTemplate*/
Slf4j
Configuration
public class RabbitMQConfig {// 定义队列、交换机和路由键名称// 定义直连交换机相关的常量// 队列名称public static final String DIRECT_QUEUE test.direct.queue;// 交换机名称public static final String DIRECT_EXCHANGE test.direct.exchange;// 路由键名称public static final String DIRECT_ROUTING_KEY test.direct.routingkey;// 定义主题交换机相关的常量// 队列1名称public static final String TOPIC_QUEUE_1 test.topic.queue1;// 队列2名称public static final String TOPIC_QUEUE_2 test.topic.queue2;// 交换机名称public static final String TOPIC_EXCHANGE test.topic.exchange;// 路由键名称public static final String TOPIC_ROUTING_KEY_1 test.topic.routingkey1;// 路由键名称public static final String TOPIC_ROUTING_KEY_2 test.topic.#;// 定义扇形交换机相关的常量// 队列1名称public static final String FANOUT_QUEUE_1 test.fanout.queue1;// 队列2名称public static final String FANOUT_QUEUE_2 test.fanout.queue2;// 交换机名称public static final String FANOUT_EXCHANGE test.fanout.exchange;// 1. 直连型交换机队列/*** 声明直连型交换机的队列** return 队列对象*/Beanpublic Queue directQueue() {log.info(创建队列: DIRECT_QUEUE);return new Queue(DIRECT_QUEUE, true);}// 2. 主题型交换机队列/*** 声明主题型交换机的第一个队列** return 队列对象*/Beanpublic Queue topicQueue1() {log.info(创建队列: TOPIC_QUEUE_1);return new Queue(TOPIC_QUEUE_1, true);}/*** 声明主题型交换机的第二个队列** return 队列对象*/Beanpublic Queue topicQueue2() {log.info(创建队列: TOPIC_QUEUE_2);return new Queue(TOPIC_QUEUE_2, true);}// 3. 扇形交换机队列/*** 声明扇形交换机的第一个队列** return 队列对象*/Beanpublic Queue fanoutQueue1() {log.info(创建队列: FANOUT_QUEUE_1);return new Queue(FANOUT_QUEUE_1, true);}/*** 声明扇形交换机的第二个队列** return 队列对象*/Beanpublic Queue fanoutQueue2() {log.info(创建队列: FANOUT_QUEUE_2);return new Queue(FANOUT_QUEUE_2, true);}// 1. 直连型交换机/*** 声明直连型交换机** return 交换机对象*/Beanpublic DirectExchange directExchange() {log.info(创建交换机: DIRECT_EXCHANGE);return new DirectExchange(DIRECT_EXCHANGE, true, false);}// 2. 主题型交换机/*** 声明主题型交换机** return 交换机对象*/Beanpublic TopicExchange topicExchange() {log.info(创建交换机: TOPIC_EXCHANGE);return new TopicExchange(TOPIC_EXCHANGE, true, false);}// 3. 扇形交换机/*** 声明扇形交换机** return 交换机对象*/Beanpublic FanoutExchange fanoutExchange() {log.info(创建交换机: FANOUT_EXCHANGE);return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 绑定直连型交换机和队列/*** 绑定直连型交换机和队列** return 绑定对象*/Beanpublic Binding bindingDirect() {log.info(绑定队列: DIRECT_QUEUE 到交换机: DIRECT_EXCHANGE 路由键: DIRECT_ROUTING_KEY);return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);}// 绑定主题型交换机和队列/*** 绑定主题型交换机的第一个队列** return 绑定对象*/Beanpublic Binding bindingTopic1() {log.info(绑定队列: TOPIC_QUEUE_1 到交换机: TOPIC_EXCHANGE 路由键: TOPIC_ROUTING_KEY_1);return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);}/*** 绑定主题型交换机的第二个队列** return 绑定对象*/Beanpublic Binding bindingTopic2() {log.info(绑定队列: TOPIC_QUEUE_2 到交换机: TOPIC_EXCHANGE 路由键: TOPIC_ROUTING_KEY_2);return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);}// 绑定扇形交换机和队列/*** 绑定扇形交换机的第一个队列** return 绑定对象*/Beanpublic Binding bindingFanout1() {log.info(绑定队列: FANOUT_QUEUE_1 到交换机: FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 绑定扇形交换机的第二个队列** return 绑定对象*/Beanpublic Binding bindingFanout2() {log.info(绑定队列: FANOUT_QUEUE_2 到交换机: FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}/*** 创建延迟交换机*/Beanpublic CustomExchange delayedExchange() {MapString, Object args new HashMap();args.put(x-delayed-type, direct);return new CustomExchange(delayed.exchange,x-delayed-message, true, false, args);}/*** 绑定延迟交换机* return*/Beanpublic Binding bindingDelayed() {return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(DIRECT_ROUTING_KEY).noargs();}// 使用JSON序列化消息/*** 配置JSON消息转换器** return 消息转换器对象*/Beanpublic MessageConverter jsonMessageConverter() {log.info(配置JSON消息转换器);return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate/*** 配置RabbitTemplate设置消息转换器以及消息发送确认和返回回调** param connectionFactory 连接工厂* return 配置好的RabbitTemplate对象*/Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jsonMessageConverter());// 消息发送到交换器后确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {log.info(消息成功发送到Exchange: correlationData);} else {log.error(消息发送到Exchange失败: correlationData);}});// 消息从交换器发送到队列失败回调rabbitTemplate.setReturnsCallback(returned - {log.info(消息从Exchange路由到Queue失败: returned.getMessage());log.info(交换机: returned.getExchange());log.info(路由键: returned.getRoutingKey());log.info(返回码: returned.getReplyCode());log.info(返回信息: returned.getReplyText());});return rabbitTemplate;}
}4、创建消息生产者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** RabbitMQ生产者类用于发送不同类型的交换机消息*/
Component
public class RabbitMQProducer {/*** 注入RabbitTemplate模板用于发送消息*/Autowiredprivate RabbitTemplate rabbitTemplate;/*** 注入MessageConverter转换器用于将对象转换为消息*/Autowiredprivate MessageConverter messageConverter;/*** 发送直连型交换机消息** param message 要发送的消息对象*/public void sendDirectMessage(Object message) {CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, // 交换机名称RabbitMQConfig.DIRECT_ROUTING_KEY, // 路由键 用于消息路由message, // 要发送的消息对象correlationData // 消息的唯一标识);}/*** 发送主题型交换机消息1** param message 要发送的消息对象*/public void sendTopicMessage1(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交换机名称RabbitMQConfig.TOPIC_ROUTING_KEY_1, // 路由键 用于消息路由message // 要发送的消息对象);}/*** 发送主题型交换机消息2** param message 要发送的消息对象*/public void sendTopicMessage2(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交换机名称test.topic.routingkey2.test, // 路由键 用于消息路由message);}/*** 发送扇形交换机消息** param message 要发送的消息对象*/public void sendFanoutMessage(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, // 交换机名称, // 扇形交换机不需要路由键message);}/*** 发送延迟消息** param message 要发送的消息对象* param delayMillis 消息延迟的时间毫秒*/public void sendDelayedMessage(Object message, int delayMillis) {MessageProperties props new MessageProperties();props.setDelay(delayMillis);Message msg messageConverter.toMessage(message, props);rabbitTemplate.convertAndSend(delayed.exchange, // 关键修改点RabbitMQConfig.DIRECT_ROUTING_KEY,msg);}
}5、创建消息消费者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** RabbitMQ消费者类* 该类包含了对不同类型队列直连型、主题型、扇形的消息消费方法*/
Component
Slf4j
public class RabbitMQConsumer {/*** 直连型队列消费者* 监听直连型队列并处理收到的消息** param message 消息内容* param channel 消息通道* throws IOException 当消息处理失败时抛出异常*/RabbitListener(queues RabbitMQConfig.DIRECT_QUEUE)public void processDirectMessage(Message message, Channel channel) throws IOException {try {// 打印收到的消息内容log.info(直连型队列收到消息: new String(message.getBody()));// 手动ACK确认 如果不确认会消息Unacked状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主题型队列1消费者* 监听主题型队列1并处理收到的消息** param message 消息内容* param channel 消息通道* param msg 消息对象* throws IOException 当消息处理失败时抛出异常*/RabbitListener(queues RabbitMQConfig.TOPIC_QUEUE_1)public void processTopicMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info(主题型队列1收到消息: message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主题型队列2消费者* 监听主题型队列2并处理收到的消息** param message 消息内容* param channel 消息通道* param msg 消息对象* throws IOException 当消息处理失败时抛出异常*/RabbitListener(queues RabbitMQConfig.TOPIC_QUEUE_2)public void processTopicMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info(主题型队列2收到消息: message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形队列1消费者* 监听扇形队列1并处理收到的消息** param message 消息内容* param channel 消息通道* param msg 消息对象* throws IOException 当消息处理失败时抛出异1常*/RabbitListener(queues RabbitMQConfig.FANOUT_QUEUE_1)public void processFanoutMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info(扇形队列1收到消息: message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形队列2消费者* 监听扇形队列2并处理收到的消息** param message 消息内容* param channel 消息通道* param msg 消息对象* throws IOException 当消息处理失败时抛出异常*/RabbitListener(queues RabbitMQConfig.FANOUT_QUEUE_2)public void processFanoutMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息内容log.info(扇形队列2收到消息: message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败拒绝消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}
}6、创建测试控制器
package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.RabbitMQProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** RabitMQ控制器* author lw*/
Slf4j
RestController
RequestMapping(/rabbitmq)
public class RabbitMQController {// 注入RabbitMQ生产者Autowiredprivate RabbitMQProducer rabbitMQProducer;/*** 发送直连型消息* return*/GetMapping(/direct)public String sendDirectMessage() {log.info(发送直连型消息);rabbitMQProducer.sendDirectMessage(这是一条直连型交换机消息);return 直连型消息发送成功;}/*** 发送主题型消息1* return*/GetMapping(/topic1)public String sendTopicMessage1() {log.info(发送主题型消息1);rabbitMQProducer.sendTopicMessage1(这是一条主题型交换机消息1);return 主题型消息1发送成功;}/*** 发送主题型消息2* return*/GetMapping(/topic2)public String sendTopicMessage2() {log.info(发送主题型消息2);rabbitMQProducer.sendTopicMessage2(这是一条主题型交换机消息2);return 主题型消息2发送成功;}GetMapping(/fanout)public String sendFanoutMessage() {log.info(发送扇形消息);rabbitMQProducer.sendFanoutMessage(这是一条扇形交换机消息);return 扇形消息发送成功;}/*** 发送延迟消息* return*/GetMapping(/delay)public String sendDelayedMessage() {log.info(发送延迟消息);// 延迟5秒rabbitMQProducer.sendDelayedMessage(这是一条延迟消息, 5000);return 延迟消息发送成功5秒后消费;}
}