成都人才网站建设,自己怎么做装修网站,深达网站制作深圳公司,校园网站开发背景在 RabbitMQ 中#xff0c;交换机#xff08;Exchange#xff09;是一个核心组件#xff0c;负责接收来自生产者的消息#xff0c;并根据特定的路由规则将消息分发到相应的队列。交换机的存在改变了消息发送的模式#xff0c;使得消息的路由更加灵活和高效。
交换机的类…在 RabbitMQ 中交换机Exchange是一个核心组件负责接收来自生产者的消息并根据特定的路由规则将消息分发到相应的队列。交换机的存在改变了消息发送的模式使得消息的路由更加灵活和高效。
交换机的类型
RabbitMQ 提供了四种主要类型的交换机每种交换机的路由规则不同 Direct Exchange直连交换机 功能基于路由键Routing Key将消息发送到与该路由键完全匹配的队列。应用场景适用于需要精确匹配路由键的场景。示例假设有两个队列 A 和 BA 绑定了路由键 key1B 绑定了路由键 key2。当生产者发送一条路由键为 key1 的消息时只有队列 A 会接收到这条消息。 Fanout Exchange扇出交换机 功能将消息广播到所有绑定到该交换机的队列不考虑路由键。应用场景适用于需要将消息广播到多个队列的场景。示例假设有两个队列 A 和 B 都绑定到了一个 Fanout 交换机上。当生产者发送一条消息到该交换机时A 和 B 都会接收到这条消息。 Topic Exchange主题交换机 功能基于路由键的模式匹配使用通配符将消息发送到匹配的队列。应用场景适用于需要基于模式匹配路由键的场景。示例假设有两个队列 A 和 BA 绑定了路由键模式 key.*B 绑定了路由键模式 key.#。当生产者发送一条路由键为 key.test 的消息时A 和 B 都会接收到这条消息。 Headers Exchange头交换机 功能基于消息的头部属性进行匹配将消息发送到匹配的队列。应用场景适用于需要基于消息头部属性进行路由的场景。示例这种交换机使用较少通常在特定情况下才会使用。
交换机的作用
消息路由交换机根据路由规则将消息分发到相应的队列。解耦生产者和消费者生产者只需将消息发送到交换机不需要知道消息的最终目的地队列。灵活性和扩展性通过不同类型的交换机可以实现复杂的消息路由逻辑满足各种业务需求。
示例代码
以下是如何使用 Direct Exchange 和 Fanout Exchange 的示例代码
Direct Exchange 示例
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange direct_logs;const msg Hello World!;const routingKey key1;channel.assertExchange(exchange, direct, { durable: true });channel.publish(exchange, routingKey, Buffer.from(msg));console.log( [x] Sent %s: %s, routingKey, msg);});setTimeout(function() {connection.close();process.exit(0);}, 500);
});Fanout Exchange 示例
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange logs;const msg Hello World!;channel.assertExchange(exchange, fanout, { durable: true });channel.publish(exchange, , Buffer.from(msg));console.log( [x] Sent %s, msg);});setTimeout(function() {connection.close();process.exit(0);}, 500);
});Topic Exchange 示例
Topic Exchange 允许使用通配符进行路由支持更复杂的路由规则。
发布者代码
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange topic_logs;const msg Hello World!;const routingKey quick.orange.rabbit;channel.assertExchange(exchange, topic, { durable: true });channel.publish(exchange, routingKey, Buffer.from(msg));console.log( [x] Sent %s: %s, routingKey, msg);});setTimeout(function() {connection.close();process.exit(0);}, 500);
});消费者代码
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange topic_logs;const queue topic_queue;channel.assertExchange(exchange, topic, { durable: true });channel.assertQueue(queue, { durable: true });// 绑定队列到交换机使用通配符channel.bindQueue(queue, exchange, *.orange.*);channel.consume(queue, function(msg) {if (msg.content) {console.log( [x] Received %s: %s, msg.fields.routingKey, msg.content.toString());}}, { noAck: true });});
});在这个示例中发布者将消息发送到 topic_logs 交换机使用路由键 quick.orange.rabbit。消费者绑定到 topic_logs 交换机使用通配符 *.orange.*因此会接收到所有包含 orange 的消息。
Headers Exchange 示例
Headers Exchange 基于消息头部属性进行路由适用于需要复杂路由规则的场景。
发布者代码
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange headers_logs;const msg Hello World!;channel.assertExchange(exchange, headers, { durable: true });channel.publish(exchange, , Buffer.from(msg), {headers: {format: pdf,type: report}});console.log( [x] Sent %s, msg);});setTimeout(function() {connection.close();process.exit(0);}, 500);
});消费者代码
const amqp require(amqplib/callback_api);amqp.connect(amqp://localhost, function(error0, connection) {if (error0) {throw error0;}connection.createChannel(function(error1, channel) {if (error1) {throw error1;}const exchange headers_logs;const queue headers_queue;channel.assertExchange(exchange, headers, { durable: true });channel.assertQueue(queue, { durable: true });// 绑定队列到交换机使用头部属性channel.bindQueue(queue, exchange, , {x-match: all,format: pdf,type: report});channel.consume(queue, function(msg) {if (msg.content) {console.log( [x] Received %s, msg.content.toString());}}, { noAck: true });});
});在这个示例中发布者将消息发送到 headers_logs 交换机并设置消息头部属性 format: pdf 和 type: report。消费者绑定到 headers_logs 交换机使用头部属性匹配 format: pdf 和 type: report因此会接收到符合这些头部属性的消息。