网站建设与实现毕业答辩ppt,平面设计素材网站排行榜前十名,佛山网站建设no.1,自己做模板网站文章目录
一、什么是消息队列#xff1f;
二、消息队列的作用#xff08;优点#xff09;
1、解耦
2、流量削峰
3、异步
4、顺序性
三、RabbitMQ基本结构
四、RabbitMQ队列模式
1、简单队列模式
2、工作队列模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、…文章目录
一、什么是消息队列
二、消息队列的作用优点
1、解耦
2、流量削峰
3、异步
4、顺序性
三、RabbitMQ基本结构
四、RabbitMQ队列模式
1、简单队列模式
2、工作队列模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、RPC模式
7、发布者确认模式
五、RabbitMQ相关属性描述
总结 一、什么是消息队列
消息队列是一种用于在分布式系统中进行通信的技术。它是一种存储和转发消息的中间件可以用
于将应用程序之间的通信解耦从而实现高效的异步通信。消息队列允许发送者将消息发送到队列
中而接收者则可以从队列中获取消息并进行处理。这种方式可以帮助系统实现高可用性、高性
能、松耦合和可伸缩性。消息队列通常包括生产者发送消息的应用程序、消费者接收消息的
应用程序和队列存储消息的缓冲区。
RabbitMQ是由erlang语言开发基于AMQP(高级消息队列协议)协议实现的一种消息队列。市面
上还有很多消息队列比如Kafka、RocketMQ、Redis等各有优劣本文主要介绍RabbitMQ。
官方文档RabbitMQ Tutorials | RabbitMQ
二、消息队列的作用优点
1、解耦
应用程序解耦通过引入消息队列不同的应用程序之间可以通过消息队列进行通信而无需直接
调用对方的接口或方法。这样可以降低系统中各个应用程序之间的耦合度使得它们可以独立演化
和扩展而不会因为对方的变化而受到影响。
2、流量削峰
消息队列可以作为一个缓冲区暂时存储流入的消息直到系统有足够的资源来处理它们。当系统
出现流量高峰时消息队列可以暂时存储过多的消息以平滑处理流量的波动避免系统被突发的
高负载压垮。
3、异步
发送者在发送消息后可以立即继续执行其他操作而不需要等待接收者的响应。这样可以提高系统
的并发性和响应速度。也可以帮助提高系统的吞吐量特别是在面对大量请求或处理复杂计算时。
发送者可以并行地向多个接收者发送消息而不会因为等待接收者的响应而阻塞。
4、顺序性
虽然并不是所有消息队列都能保证消息的绝对顺序性但是在许多情况下消息队列可以保证消息
的相对顺序性。即按照发送顺序进行处理对某些场景要求顺序执行很适合。
三、RabbitMQ基本结构
名称
描述
Connection连接
连接是生产者和消费者与RabbitMQ之间的连接。每个生产者和消费者都需要与RabbitMQ建立一个连接以便发送和接收消息。连接通常是长连接可以重用以提高性能和效率。
Channel信道
Channel是连接Connection内的逻辑通道用于完成大部分 AMQP 操作如声明队列、发送和接收消息等。在 RabbitMQ 中引入 Channel信道的主要目的是为了提高系统的性能、灵活性和效率。使用 Channel 可以避免频繁地创建和销毁连接因为一个连接可以包含多个 Channel。这样可以减少连接的开销节省系统资源并提高性能。
Exchange交换机
交换机是消息的接收和分发中心负责接收生产者发送的消息并根据指定的路由规则发送到一个或多个队列中。
Exchange相当于Queue的代理可以设置不同的写入策略写入到对应的队列中。对于队列的写入更加灵活
交换机的类型有fanout扇出、topic主题、direct直接
Queue队列
队列是消息的缓存区用于存储交换机发送的消息。生产者发送的消息最终会被存储在队列中等待消费者进行消费。队列可以持久化到磁盘以确保消息不会在RabbitMQ宕机或重启后丢失。
Producer生产者
生产者是发送消息到RabbitMQ的应用程序。生产者负责创建消息并将其发送到RabbitMQ的消息队列中。
Consumer消费者
消费者是从RabbitMQ队列中接收消息并进行处理的应用程序。消费者可以订阅一个或多个队列并在消息到达队列时接收并处理它们。消费者负责监听队列中的消息并将其取出进行处理。
四、RabbitMQ队列模式
基于Exchange交换机RabbitMQ截至目前有七种队列模式。
1、简单队列模式
一个消息生产者一个消息消费者一个队列。也称为点对点模式。 图中P代表生产者C代表消费者Queue是队列名称。
我们看到是没有Exchange的但是RabbitMQ也会有一个默认的交换机。这个默认的交换机通常被
称为amq.default或者空字符串是RabbitMQ自动创建的用于在没有指定交换机的情况
下将消息发送到队列。
//生产者
var factory new ConnectionFactory { HostName localhost}; //初始化连接信息
using var connection factory.CreateConnection(); //创建连接
using var channel connection.CreateModel(); //创建信道//声明一个队列并将信道与队列绑定
channel.QueueDeclare(queue: hello,durable: false,exclusive: false,autoDelete: false,arguments: null);
//发送消息的内容
string message $Hello World!;
var body Encoding.UTF8.GetBytes(message);//信道绑定交换机
channel.BasicPublish(exchange: string.Empty,routingKey: string.Empty,basicProperties: null,body: body);Console.WriteLine($ [x] Sent {message});Console.WriteLine( Press [enter] to exit.);//消费者
var factory new ConnectionFactory { HostName localhost };
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.QueueDeclare(queue: hello,durable: false,exclusive: false,autoDelete: false,arguments: null);Console.WriteLine( [*] Waiting for messages.);var consumer new EventingBasicConsumer(channel);
consumer.Received (model, ea)
{var body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);Console.WriteLine($ [x] Received {message});
};
channel.BasicConsume(queue: hello,autoAck: true,consumer: consumer);Console.WriteLine( Press [enter] to exit.);此时就会生产者发送一条消息消费者就会接收一条消息。
2、工作队列模式
_工作队列又叫做任务队列正常_会按顺序把消息发送给每一个订阅的消费者平均而言每个消费
者将获得相同数量的消息。不是P发送一条消息C1和C2都会收到而是第一条C1消费第二
条C2消费。每个消息只会被一个消费者接收和处理。
这样的好处是可以提高吞吐量因为生产者发送了很多消息但是消费者只有一个消费者处理很
慢就会造成消息积压。 //生产者
var factory new ConnectionFactory { HostName localhost};
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.QueueDeclare(queue: task_queue,durable: true,exclusive: false,autoDelete: false,arguments: null);var message $work queue;
var body Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,routingKey: string.Empty,basicProperties: null,body: body);
Console.WriteLine($ [x] Sent {message});Console.WriteLine( Press [enter] to exit.);//消费者
var factory new ConnectionFactory { HostName localhost };
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.QueueDeclare(queue: task_queue,durable: true,exclusive: false,autoDelete: false,arguments: null);Console.WriteLine( [*] Waiting for messages.);var consumer new EventingBasicConsumer(channel);
consumer.Received (model, ea)
{byte[] body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);Console.WriteLine($ [x] Received {message});channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: task_queue,autoAck: false,consumer: consumer);Console.WriteLine( Press [enter] to exit.);工作队列与简单队列一致会有一个默认的交换机。
3、发布/订阅模式
发布/订阅模式是一种消息传递模式它允许发送者发布者将消息发布到多个接收者订阅
者。消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上生产者通常根本
不知道消息是否会被传递到任何队列。
所以消息传递模式发布者不需要指定队列。
发布/订阅模式交换机类型为Fanout。 //发布者
var factory new ConnectionFactory { HostName localhost};
using var connection factory.CreateConnection();
using var channel connection.CreateModel();//声明一个交换机叫做logs并且交换机的类型是Fanout
channel.ExchangeDeclare(exchange: logs, type: ExchangeType.Fanout);var message publish_subscribe;
var body Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: logs,routingKey: string.Empty,basicProperties: null,body: body);
Console.WriteLine($ [x] Sent {message});Console.WriteLine( Press [enter] to exit.);//接收者
var factory new ConnectionFactory { HostName localhost};
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.ExchangeDeclare(exchange: logs, type: ExchangeType.Fanout);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,exchange: logs,routingKey: string.Empty);Console.WriteLine( [*] Waiting for logs.);var consumer new EventingBasicConsumer(channel);
consumer.Received (model, ea)
{byte[] body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);Console.WriteLine($ [x] {message});
};
channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.WriteLine( Press [enter] to exit.);注如果发布者已经发布消息到交换机但还没有队列绑定到交换机消息将会丢失。
4、路由模式
路由模式也是一种消息传递模式是基于消息的路由键routing key来将消息从交换机
exchange发送到一个或多个队列中。相比较于发布/订阅模式路由模式多了一个routing key
的概念。
路由模式交换机类型为Direct。 //生产者
var factory new ConnectionFactory { HostName localhost};
using var connection factory.CreateConnection();
using var channel connection.CreateModel();//定义交换机名称以及类型为Direct
channel.ExchangeDeclare(exchange: direct_logs, type: ExchangeType.Direct);//定义路由键
string routingKey direct_test;//发送消息体
string message direct_message;
var body Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: direct_logs,routingKey: routingKey,basicProperties: null,body: body);
Console.WriteLine($ [x] Sent {routingKey}:{message});Console.WriteLine( Press [enter] to exit.);//消费者
var factory new ConnectionFactory { HostName localhost };
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.ExchangeDeclare(exchange: direct_logs, type: ExchangeType.Direct);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName channel.QueueDeclare().QueueName;//路由键集合
var routeKeyArr new string[] { direct_test, direct_test2 };foreach (var routeKey in routeKeyArr)
{channel.QueueBind(queue: queueName,exchange: direct_logs,routingKey: routeKey);
}Console.WriteLine( [*] Waiting for messages.);var consumer new EventingBasicConsumer(channel);
consumer.Received (model, ea)
{var body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);var routingKey ea.RoutingKey;Console.WriteLine($ [x] Received {routingKey}:{message});
};
channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine( Press [enter] to exit.);路由模式消费者可以监听多个路由键。
5、主题模式
基于路由模式仍然有局限性——它不能基于多个标准进行路由。也就是一个消费者只能接收完全
与routing key相匹配的交换机。主题模式主要解决路由模式的不足可以模糊匹配routing key。
路由模式交换机类型为Topic。 在生产者方面基于 . 作为分隔符用于routing key。比如“stock.usd.nyse”、“nyse.vmw”、
“quick.orange.rabbit”。可以是任何单词但最多只有255 个字节。
在消费者方面绑定routing key有两种重要的情况
1*(星号)匹配一个单词。
具体语法
var routeing_key info.debug.error;//匹配 info
info.*.*
//匹配debug
*.debug.*
//匹配error
*.*.error2#(散列)匹配零个或多个单词。
具体语法
var routeing_key info.debug.error;//匹配 info
info.#
//匹配debug
#.debug.#
//匹配error
*.*.error6、RPC模式
RPC模式又叫请求/回复模式。
RPCRemote Procedure Call远程过程调用是一种用于在分布式系统中进行通信的技术。它
允许一个进程或线程调用另一个进程或线程的过程函数或方法就像调用本地函数一
样而不需要开发者显式处理底层通信细节。
就是生产者发送一条消息消费者端执行某个方法获取值的同时并返回到生产者。 //生产者
var factory new ConnectionFactory { HostName localhost};
using var connection factory.CreateConnection();
using var channel connection.CreateModel();//定义接收返回结果的队列
var replyQueueName channel.QueueDeclare().QueueName;
var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea)
{var body ea.Body.ToArray();var response Encoding.UTF8.GetString(body);Console.WriteLine( [.] Got {0}, response);
};//发送消息
var correlationId Guid.NewGuid().ToString(); //消息唯一性
var props channel.CreateBasicProperties();
props.CorrelationId correlationId;
props.ReplyTo replyQueueName; //回调队列名称string message 30;
var messageBytes Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: ,routingKey: rpc_queue,basicProperties: props,body: messageBytes);channel.BasicConsume(consumer: consumer,queue: replyQueueName,autoAck: true);//消费者
var factory new ConnectionFactory { HostName localhost };
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.QueueDeclare(queue: rpc_queue,durable: false,exclusive: false,autoDelete: false,arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer new EventingBasicConsumer(channel);
channel.BasicConsume(queue: rpc_queue,autoAck: false,consumer: consumer);
Console.WriteLine( [x] Awaiting RPC requests);
consumer.Received (model, ea)
{string response string.Empty;var body ea.Body.ToArray();var props ea.BasicProperties;var replyProps channel.CreateBasicProperties();replyProps.CorrelationId props.CorrelationId;try{var message Encoding.UTF8.GetString(body);int n int.Parse(message);Console.WriteLine($ [.] Fib({message}));response FibHelper.Fib(n).ToString();}catch (Exception e){Console.WriteLine($ [.] {e.Message});response string.Empty;}finally{//回调到生产者队列var responseBytes Encoding.UTF8.GetBytes(response);channel.BasicPublish(exchange: string.Empty,routingKey: props.ReplyTo,basicProperties: replyProps,body: responseBytes);channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);}
};//执行函数并返回结果
public class FibHelper
{public static int Fib(int n){if (n 0 || n 1)return n;return Fib(n - 1) Fib(n - 2);}
}RPC模式不是消息传递模式消息只会被一个消费者消费。
7、发布者确认模式
发布者确认模式Publisher Confirmation是 RabbitMQ 提供的一种机制用于确保消息被成功
发送到交换机exchange并被接收到以及确保消息被正确地路由到队列中。在传统的消息发
布过程中发布者发送消息到交换机后并不知道消息是否已经被正确地处理。为了解决这个问
题RabbitMQ 提供了发布者确认模式允许发布者确认消息是否已经被成功接收到。
//生产者
var factory new ConnectionFactory() { HostName localhost };
using (var connection factory.CreateConnection())
using (var channel connection.CreateModel())
{// 设置信道为确认模式channel.ConfirmSelect();// 声明一个队列channel.QueueDeclare(queue: hello,durable: false,exclusive: false,autoDelete: false,arguments: null);// 消息内容string message Hello World!;var body Encoding.UTF8.GetBytes(message);try{// 发送消息channel.BasicPublish(exchange: ,routingKey: ,basicProperties: null,body: body);// 等待消息确认if (channel.WaitForConfirms()){Console.WriteLine( [x] Sent {0}, message);}else{Console.WriteLine( [x] Failed to send {0}, message);}}catch (Exception ex){Console.WriteLine($An error occurred: {ex.Message});}
}Console.WriteLine( Press [enter] to exit.);
Console.ReadLine();//消费者
var factory new ConnectionFactory() { HostName localhost };
using (var connection factory.CreateConnection())
using (var channel connection.CreateModel())
{// 声明一个队列channel.QueueDeclare(queue: hello,durable: false,exclusive: false,autoDelete: false,arguments: null);// 创建消费者var consumer new EventingBasicConsumer(channel);// 消费消息consumer.Received (model, ea) {var body ea.Body;var message Encoding.UTF8.GetString(body);Console.WriteLine( [x] Received {0}, message);};// 消费者开始接收消息channel.BasicConsume(queue: hello,autoAck: true,consumer: consumer);Console.WriteLine( Press [enter] to exit.);Console.ReadLine();
}五、RabbitMQ相关属性描述
上述代码中有很多属性的设置下面解释一下。
名称
值
描述
durable
布尔
队列是否持久化。如果将队列声明为持久化那么当 RabbitMQ 服务器重启时队列将被重新声明。持久化队列中的消息会被存储在磁盘上因此即使在服务器重启后消息也不会丢失。
exclusive
布尔
是否为排他队列。当队列被声明为排他时只有声明该队列的连接connection才能使用它。一旦连接关闭排他队列就会被删除。这种队列通常是用于临时任务只允许声明它的连接使用不会被其他连接访问。
autoDelete
布尔
队列是否自动删除。如果将队列声明为自动删除则在最后一个连接订阅它的消费者取消订阅后队列将被自动删除。这种属性通常与临时队列一起使用以确保在不再需要队列时它会被清理。
arguments
类
用于设置队列或交换机的额外参数的选项。这些参数可以用于定制化队列或交换机的行为以满足特定的需求。比如设置队列的最大长度、消息在队列中的最大存活时间。
autoAck
布尔
消息是否自动确认。它是指在消费者从队列中接收到消息后是否自动确认消息的消费。当设置为 true 时表示消费者会自动确认收到的消息此时队列中表示该消息已被消费成功了。当设置为 false 时表示消费者需要显式地调用确认方法来告知 RabbitMQ 已经成功处理了消息否则消息将被重新放回队列等待其他消费者处理。
basicProperties
类
是指在发布消息时可以携带的消息属性。这些属性包含了有关消息的元数据信息例如消息的优先级、消息的过期时间、消息的类型等等。 总结
RabbitMQ 是一个消息队列主要作用就是异步、顺序性、削峰等。
七种队列模式可以根据不同的场景具体使用。
1. 简单队列模式
最简单的消息模式。一个生产者发送消息到一个队列一个消费者从队列中接收消息并处理。适用
于单个生产者-单个消费者的简单场景。
2. 工作队列模式
多个消费者共同消费消息。消费者从队列中取出消息并处理消息会平均地分配给消费者。
是基于简单队列模式的缺点做了提升。适用于负载均衡和任务分发的场景。
3. 发布/订阅模式
生产者将消息发送到交换机交换机将消息广播到所有与之绑定的队列。多个消费者可以订阅不同
的队列从而接收消息的副本。适用于消息广播和通知的场景。
4. 路由模式
生产者发送消息到交换机并使用路由键指定消息的目标队列。交换机根据消息的路由键将消息路
由到与之匹配的队列中。适用于根据消息内容进行精确路由的场景。
5. 主题模式
类似于路由模式但是路由键可以使用通配符进行匹配。适用于消息的多样化路由和灵活的匹配需
求。
6. RPC模式
客户端RPC请求者发送请求消息到队列中并等待服务器RPC响应者返回响应消息。
服务器监听请求队列处理请求并将响应发送回客户端指定的队列。适用于需要请求-响应式通信
的场景类似于远程调用。
7. 发布者确认模式
发布者确认模式是 RabbitMQ 提供的一种机制用于确保消息在发送到交换机并被路由到队列时的
可靠性。