南京百度网站排名,彩票网站制作商,网站的建设部署与发布,如何做360购物网站前言
RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。
特点#xff1a;它通过发布/订阅模型#xff0c;实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。
作用#xff1a;服务间异步通信#xff1b;顺序消费#xff1b;定时任务#xff1b;请求削…前言
RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。
特点它通过发布/订阅模型实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。
作用服务间异步通信顺序消费定时任务请求削峰
1、AMQP协议定义
AMQPAdvanced Message Queuing Protocol高级消息队列协议是一个高效的、跨平台的应用层协议 MQTTMessage Queuing Telemetry Transport消息队列遥测传输
特性AMQPMQTT适用场景大型企业级应用、金融交易、云服务物联网、移动应用、智能家居通信模式生产者-消费者发布-订阅消息大小较大适合复杂的消息结构小型适合简单的消息QoS 级别支持但不如 MQTT 精细详细的 QoS 级别特别是针对 IoT 场景性能要求对性能有一定要求但更注重可靠性和安全性极低的带宽消耗和资源占用安全性强调端到端的安全性支持基本的安全特性适用于资源受限环境
2、AMQP机制
1AMQP生产者、消费者工作机制 AMQP高级消息队列协议基于生产者消费者模式消息基于交换器Exchange、队列Queue、绑定Binding进行路由。
生产者发送消息到Broker消息代理服务交换器接收生产者发送的消息根据预定义规则分发给一个或多个队列队列存储消息直到消费者取走消息消费者读取队列中的消息
AMQP定义了严格的消息结构使用了类型化数据表示法描述消息内容来兼容不同的系统。
类型化数据表示法Typed Representation of Data是指在计算机编程语言中数据和其相关联的类型信息一起被表示的方法。
2AMQP消息传递方式
特性点对点模式 (P2P)发布/订阅模式 (Pub/Sub)消息传递方式每条消息仅被一个消费者处理一条消息可以被多个消费者同时接收队列数量单个队列每个消费者有自己的队列生产者行为直接发送到队列发送到交换器由交换器负责路由消费者行为从同一队列中竞争消费各自独立消费自己的队列中的消息适用场景任务分配、工作流管理广播通知、日志记录、事件驱动架构扩展性受限于单个队列的吞吐量可以通过增加更多的消费者来提高整体吞吐量复杂度较低易于理解和实现需要考虑交换器类型、路由规则等因素稍微复杂 1、点对点 生产者将消息发送到一个特定的队列中而消费者则从该队列中获取消息 每个消息只会被一个消费者处理即使有多个消费者监听同一个队列。
竞争消费多个实例尝试处理同一个消息时可能出现重复消费或消息未及时得到处理的情况。 1竞争消费问题 在k8s部署多实例场景下虽然提升了系统的吞吐量通过调度器实现了负载均衡多个实例从一个队列中读取消息但是并发场景客观存在竞争消费的情况导致重复消费消息。 2解决建议 合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。
// 生产者代码片段
Channel channel connection.createChannel();
channel.queueDeclare(task_queue, true, false, false, null);
String message Task to be processed;
channel.basicPublish(, task_queue, null, message.getBytes());// 消费者代码片段
DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );// 执行任务...channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(task_queue, false, deliverCallback, consumerTag - {});2、发布订阅 生产者将消息发送到一个交换器Exchange而不是直接发送到队列。交换器根据预定义的路由规则Binding Key将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅所有订阅者都能收到相同的消息副本
1主题分区 为不同类型的时间创建不同的主题或分区来减少不必要的复制。实例只订阅感兴趣的主题降低资源开销 2限流 避免过载限制单位之间内消费的最大消息
// 生产者代码片段
Channel channel connection.createChannel();
channel.exchangeDeclare(logs_exchange, fanout);
String message Info log message;
channel.basicPublish(logs_exchange, , null, message.getBytes());// 消费者代码片段
String queueName channel.queueDeclare().getQueue();
channel.queueBind(queueName, logs_exchange, );
DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );// 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag - {});3、AMQP消息只被消费一次
1、合理配置消息队列ACK机制 大多数消息队列都提供了手动确认ACK的功能允许消费者成功处理后主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag - {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);2、合理配置消息队列预取数量 防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);3、消费者幂等性设计 针对消息全局唯一的ID入库每次收到消息时先检查是否已入库 确保同一条消息多次处理的结果是一致的避免重复的消息执行两次结果不一致 增加补偿机制比如退款退积分等概念的操作4、分布式锁 借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理只有获取到锁的消息可以处理其他的放弃或等待。5、监控告警机制 监控消息队列服务健康情况针对可能重复消费的消息及时告警到服务负责人介入处理。6、事务性消息 指的是消息和业务操作一起成功或一起失败的机制。 1本地事务补偿机制 2二阶段提交 引入协调者和参与者的概念。 客户端向协调者发起事务请求协调者询问各参与者是否准备好。全部准备好则发出提交事务命令否则全部回滚。每个参与者返回ack结果协调者汇总执行结果释放占用的资源。 3三阶段提交 针对二阶段提交完善事务性消息机制。 首先客户端向协调者发起事务请求协调者询问各参与者是否准备好。全部准备好则发出预执行事务命令。各参与者收到命令执行事务但不提交。并返回ack等待最终命令。协调者收到全部准备好则发出提交事务命令。
4、AMQP 消息顺序消费
单实例独占队列可以保证顺序消费但是分布式高可用场景一般都是多实例部署独占队列无法解决消息顺序消费的问题。为了保证顺序消费通常建议针对预取消息数量Prefetch Count设置为1channel.basicQos(1);可以使用分布式锁确保消息消费是同步操作并发安全在成功处理消息后手动发送ack确认到消息代理。另外使用幂等性设计来避免重复消费。增加补偿机制来处理幂等性设计无法保证的场景比如退款等操作增加监控告警到服务负责人。可以对消息根据业务类型或特定的前缀规则将不同的消息分到不同的分区或队列中每个队列和分区内部是遵循先进先出规则来保证顺序消费的。
5、AMQP消息可靠性
事务支持 允许一组操作作为一个整体提交或回滚。ACK确认机制 当消息成功投递后接收方会向发送方发送 ACK 确认如果发生错误则发送 NACK 拒绝。持久化选项 可以选择是否将消息保存到磁盘上以防服务中断时丢失重要数据。
6、RabbitMQ配置ACK
1rabbitmq.conf或rabbitmq.ini开启配置
# 启用自动恢复功能确保在网络中断后能够自动重连
connection.cached true
# 设置心跳检测间隔防止长时间无通信导致连接断开
heartbeat 60
# 启用 Publisher Confirms允许生产者收到消息确认
publisher_confirms on2消费者手动确认 声明队列确保队列存在 设置预取计数限制每次从队列中拉取的消息数量为 1以避免过载 开启手动确认模式通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认改为手动确认 发送 ACK 确认在成功处理完消息后调用 channel.basicAck 方法发送确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME task_queue;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明队列确保它存在channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置预取计数为 1确保每次只处理一条消息channel.basicQos(1);// 开启手动确认模式DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );try {// 模拟任务处理时间Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();} finally {System.out.println( [x] Done);// 手动发送 ACK 确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {});}}
}3配置 Publisher Confirms和Transaction 允许生产者在发送消息后等待消息代理的确认
// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}Channel channel connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}// 开启事务模式
channel.txSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());channel.txCommit();
} catch (Exception e) {channel.txRollback();
}7、RabbitMQ配置协议
1rabbitmq.conf RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。 支持启用SSL认证提高安全性。 支持设置心跳保证客户端和服务端连接保持活跃。
# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default 5672
# 确保 AMQP 插件已启用AMQP 0-9-1 是默认启用的
enabled_plugins [rabbitmq_amqp1_0]# 启用 SSL/TLS 支持
ssl_options.cacertfile /path/to/ca_certificate.pem
ssl_options.certfile /path/to/server_certificate.pem
ssl_options.keyfile /path/to/private_key.pem
ssl_options.verify verify_peer
ssl_options.fail_if_no_peer_cert true
# 设置 SSL/TLS 监听端口
listeners.ssl.default 5671# 设置心跳间隔时间为 60 秒
heartbeat 608、RabbitMQ消息持久化
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentExample {private final static String QUEUE_NAME persistent_queue;private final static String EXCHANGE_NAME persistent_exchange;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 创建持久化的交换器channel.exchangeDeclare(EXCHANGE_NAME, direct, true);// 创建持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routing_key);// 发送持久化的消息String message Persistent message!;AMQP.BasicProperties props new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化.build();channel.basicPublish(EXCHANGE_NAME, routing_key, props, message.getBytes());System.out.println( [x] Sent message );}}
}1、持久化队列 channel.queueDeclare(durable_queue, true, false, false, null);2、交换器持久化 确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定以避免消息丢失 channel.exchangeDeclare(durable_exchange, direct, true);3、消息持久化 delivery_mode 参数设置为 2 表示持久化消息设置为 1默认则表示非持久化消息 channel.basicPublish(exchange_name, routing_key, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());
9、RabbitMQ自动重连
网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态
ConnectionFactory factory new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);10、RabbitMQ组件
组件名称说明Producer生产者负责生成并发送消息的应用程序。Consumer消费者负责接收并处理消息的应用程序。Message消息承载业务数据的基本单元包含消息体Body、属性Properties等信息。Exchange交换机用于接收来自生产者的消息并根据路由规则将其分发到一个或多个队列中。Queue队列存储待处理消息的地方消费者从中拉取消息进行处理。Binding绑定定义了交换机和队列之间的关系包括路由键等参数。Virtual Host虚拟主机类似于命名空间的概念用于隔离不同的应用环境每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。
11、RabbitMQ核心组件交换器和路由键
交换器Exchange和路由键Routing Key是消息传递系统的核心组件它们共同决定了消息如何从生产者传递到正确的队列。
消息提供方生产消息根据预定规则路由至匹配的一个或多个队列。
消息创建时设定路由键消息发布到交换器时通过队列路由键把队列绑定到交换器上。消息到达交换器后RabbitMQ会将消息的路由键与队列的路由键进行匹配。
若队列至少有一个消费者订阅消息将以轮询方式发给消费者。
交换器说明应用场景Direct精确匹配路由键只有当路由键完全匹配时消息才会被发送到对应的队列。适用于一对一的消息分发。Topic基于通配符模式匹配路由键适用于灵活的消息过滤和多条件匹配。Fanout广播所有消息给所有绑定的队列适用于需要将相同消息发送给多个消费者的场景。Headers根据消息头属性进行路由适用于复杂的消息路由需求例如根据多个字段组合来决定消息去向。
1Direct Exchange 精准匹配路由键交换器 根据路由键完全匹配队列如果找不到匹配的队列则消息会被丢弃。
生产者
// 创建 Direct Exchange
channel.exchangeDeclare(direct_logs, direct);// 绑定队列到 Exchange并指定 Binding Key
String queueName channel.queueDeclare().getQueue();
channel.queueBind(queueName, direct_logs, info);
channel.queueBind(queueName, direct_logs, warning);
channel.queueBind(queueName, direct_logs, error);// 发送消息时指定 Routing Key
channel.basicPublish(direct_logs, info, null, Info log message.getBytes());消费者
import com.rabbitmq.client.*;public class DirectConsumer {private final static String EXCHANGE_NAME direct_logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, direct);// 创建临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列到 Exchange并指定 Binding Keyif (argv.length 1) {System.err.println(Usage: DirectConsumer [info] [warning] [error]);System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}}
}2Fanout Exchange广播交换器 广播所有消息给所有绑定的队列
生产者
// 创建 Fanout Exchange
channel.exchangeDeclare(logs, fanout);// 绑定队列到 Exchange
String queueName channel.queueDeclare().getQueue();
channel.queueBind(queueName, logs, );// 发送消息时不指定 Routing Key
channel.basicPublish(logs, , null, Broadcast log message.getBytes());消费者
import com.rabbitmq.client.*;public class FanoutConsumer {private final static String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 创建临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列到 Exchangechannel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}}
}3Topic Exchange 通配符路由器 *匹配一个单词#匹配零个或多个单词
生产者
// 创建 Topic Exchange
channel.exchangeDeclare(topic_logs, topic);// 绑定队列到 Exchange并指定 Binding Key 模式
String queueName channel.queueDeclare().getQueue();
channel.queueBind(queueName, topic_logs, *.orange.*);
channel.queueBind(queueName, topic_logs, *.*.rabbit);
channel.queueBind(queueName, topic_logs, lazy.#);// 发送消息时指定符合模式的 Routing Key
channel.basicPublish(topic_logs, quick.orange.rabbit, null, Quick orange rabbit.getBytes());消费者
import com.rabbitmq.client.*;public class TopicConsumer {private final static String EXCHANGE_NAME topic_logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, topic);// 创建临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列到 Exchange并指定 Binding Key 模式if (argv.length 1) {System.err.println(Usage: TopicConsumer [binding_key_pattern]);System.exit(1);}for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}}
}4Headers Exchange 根据消息头属性进行路由 不依赖路由键当消息的 headers 完全匹配时才会将消息发送到对应的队列。
生产者
// 创建 Headers Exchange
channel.exchangeDeclare(headers_exchange, headers);// 绑定队列到 Exchange并指定 Headers 匹配规则
MapString, Object headers new HashMap();
headers.put(user_id, 12345);
headers.put(order_status, pending);
AMQP.Queue.BindOk bindOk channel.queueBind(queueName, headers_exchange, , headers);// 发送带有 Headers 的消息
AMQP.BasicProperties props new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(headers_exchange, , props, Message with specific headers.getBytes());消费者
import com.rabbitmq.client.*;public class HeadersConsumer {private final static String EXCHANGE_NAME headers_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, headers);// 创建临时队列String queueName channel.queueDeclare().getQueue();// 绑定队列到 Exchange并指定 Headers 匹配规则MapString, Object headers new HashMap();headers.put(user_id, 12345);headers.put(order_status, pending);AMQP.Queue.BindOk bindOk channel.queueBind(queueName, EXCHANGE_NAME, , headers);System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}}
}12、RabbitMQ核心方法及参数说明
1newConnection 创建连接工程并开启连接
ConnectionFactory factory new ConnectionFactory();
factory.setHost(localhost);
factory.setPort(5672);
factory.setUsername(guest);
factory.setPassword(guest);
Connection connection factory.newConnection();2createChannel 创建信道 RabbitMQ 使用信道的方式来传输数据
信道是建立在真实的 TCP 连接内的虚拟连接且每条 TCP 连接可以创建多个信道每个信道都是独立的通信线路可以并发地发送和接收消息。
Channel channel connection.createChannel();3exchangeDeclare 交换器声明
channel.exchangeDeclare(my_exchange, direct, true, false, null);exchange: 交换器名称。 type: 交换器类型如 “direct”, “fanout”, “topic”, “headers”。 durable: 持久化标志true 表示持久化false 表示非持久化。 autoDelete: 自动删除标志true 表示当最后一个队列断开时自动删除交换器。 internal: 内部交换器标志true 表示该交换器只能被其他交换器使用不能直接由生产者发布消息。 arguments: 其他可选参数例如死信交换器、过期时间等。
4queueDeclare 队列声明
// 创建临时队列
String queueName channel.queueDeclare().getQueue();queue: 队列名称为空字符串时表示创建临时队列。 durable: 持久化标志true 表示持久化false 表示非持久化。 exclusive: 排他性标志true 表示仅当前连接可用连接关闭后自动删除。 autoDelete: 自动删除标志true 表示当最后一个消费者断开时自动删除队列。 arguments: 其他可选参数例如死信队列、过期时间等。
5queueBind 队列绑定 将队列绑定到指定的交换器上并提供路由键或匹配规则
channel.queueBind(queueName, my_exchange, routing_key);queue: 队列名称。 exchange: 交换器名称。 routingKey: 路由键对于某些类型的交换器如 Direct 或 Topic这个值是必须的对于 Fanout 类型通常为空字符串。 arguments: 可选参数主要用于 Headers Exchange 的匹配规则
6basicPublish 发布消息 向指定的交换器发布一条消息
AMQP.BasicProperties props new AMQP.BasicProperties.Builder().contentType(text/plain).deliveryMode(2) // 2 表示持久化.build();
channel.basicPublish(my_exchange, routing_key, props, message.getBytes());exchange: 交换器名称。 routingKey: 路由键。 props: 消息属性包括内容类型、编码、持久化模式等。 body: 消息体即要发送的数据。
7basicConsume 消费消息 费来自指定队列的消息
DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });queue: 队列名称。 autoAck: 自动确认标志true 表示收到消息后立即确认false 表示手动确认。 deliverCallback: 回调函数用于处理接收到的消息。 cancelCallback: 取消回调函数当消费者的取消通知到达时调用
8basicAck 消息确认 手动确认模式下当消费者成功处理完消息后需要调用此方法来确认消息已被消费
channel.basicAck(envelope.getDeliveryTag(), false);9basicNack 消息丢弃 当消费者无法处理某条消息时可以拒绝这条消息并决定是否重新入队或者丢弃
// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);13、RabbitMQ镜像集群模式
搭建RabbitMQ保证消息队列的高可用。 创建的 queue无论元数据还是 queue 里的消息都会存在于多个实例上写消息到 queue 的时候都会自动把消息同步到多个实例的 queue。 优点高可用单个节点挂掉其他节点仍可用 缺点高负载如果某个队列消息很重则镜像复制的实例下也会很重性能开销大。 参考博客消息队列中点对点与发布订阅区别 Powered by niaonao