课程网站建设技术,专门做二手书网站或app,wordpress页面文章,看房地产的app在哪看1、Topics交换机的介绍
Topics交换机能让消息只发送往绑定了指定routingkey的队列中去#xff0c;不同于Direct交换机的是#xff0c;Topics能把一个消息往多个不同的队列发送#xff1b;Topics交换机的routingkey不能随意写#xff0c;必须是一个单词列表#xff0c;并以…1、Topics交换机的介绍
Topics交换机能让消息只发送往绑定了指定routingkey的队列中去不同于Direct交换机的是Topics能把一个消息往多个不同的队列发送Topics交换机的routingkey不能随意写必须是一个单词列表并以点号分隔开例如“one.two.three”除此外还有两个替换符*星号能代替一个单词#井号可以代替零个或多个单词例如“*.one.*”是中间是one的3个单词“*.*.one”是最后一个是one的3个单词“one.#”是第一个单词是one的多个单词若队列绑定键是#这个队列将接收所有数据这时候类似fanout交换机若队列绑定键中没有#和*出现这时候就类似direct交换机 2、Topics交换机的实现
(1)新建一个名为topics的包用于装发布确认的代码 效果图 (2)新建一个名为Receive01的类用于编写消费者的代码 代码如下 注RabbitMqUtils工具类的实现在我的另一篇文章里有需要的同学可以查看参考
RabbitMQ系列6--RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客
package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;/*** 消息接收*/
public class Receive01 {//声明交换机的名称public static final String EXCHANGE_NAME topic_exchange;//接收消息public static void main(String[] args) throws Exception{Channel channel RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,topic);//声明队列String queueName Q1;/*** 创建队列* 第一个参数队列名称* 第二个参数服务器重启后队列是否还存在即队列是否持久化,true为是false为否默认false即消息存储在内存中而不是硬盘中* 第三个参数该队列是否只供一个消费者进行消费是否进行消息共享true为只允许一个消费者进行消费false为允许多个消费者对队列进行消费默认false* 第四个参数是否自动删除最后一个消费者断开连接后该队列是否自动删除true自动删除false不自动删除* 第五个参数其他参数*/channel.queueDeclare(queueName,false,false,false,null);//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,*.one.*);/*** 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口所以需要给DeliverCallback赋值一个函数为了方便我们这里使用Lambda表达式进行赋值)* 为什么要这样写呢是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 FunctionalInterface注解规定DeliverCallback是一个函数式接口所以要往deliverCallback参数传的值要是一个函数** 以下是DeliverCallback接口的源代码* FunctionalInterface* public interface DeliverCallback {* void handle (String consumerTag, Delivery message) throws IOException;* }*/DeliverCallback deliverCallback (consumerTag, message) - {System.out.println(new String(message.getBody(),UTF-8));System.out.println(接收队列 queueName 绑定键 message.getEnvelope().getRoutingKey());};/*** 用信道对消息进行接收* 第一个参数消费的是哪一个队列的消息* 第二个参数消费成功后是否要自动应答true代表自动应当false代表手动应答* 第三个参数消费者接收消息后的回调方法* 第四个参数消费者取消接收消息后的回调方法正常接收不调用*/channel.basicConsume(queueName,true,deliverCallback,consumerTag -{});}}
(3)复制Receive01类并粘贴重命名为Receive02 代码如下
package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** 消息接收*/
public class Receive02 {//声明交换机的名称public static final String EXCHANGE_NAME topic_exchange;//接收消息public static void main(String[] args) throws Exception{Channel channel RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,topic);//声明队列String queueName Q1;/*** 创建队列* 第一个参数队列名称* 第二个参数服务器重启后队列是否还存在即队列是否持久化,true为是false为否默认false即消息存储在内存中而不是硬盘中* 第三个参数该队列是否只供一个消费者进行消费是否进行消息共享true为只允许一个消费者进行消费false为允许多个消费者对队列进行消费默认false* 第四个参数是否自动删除最后一个消费者断开连接后该队列是否自动删除true自动删除false不自动删除* 第五个参数其他参数*/channel.queueDeclare(queueName,false,false,false,null);//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,*.*.two);//队列与交换机通过routingkey进行捆绑channel.queueBind(queueName,EXCHANGE_NAME,three.#);/*** 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口所以需要给DeliverCallback赋值一个函数为了方便我们这里使用Lambda表达式进行赋值)* 为什么要这样写呢是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 FunctionalInterface注解规定DeliverCallback是一个函数式接口所以要往deliverCallback参数传的值要是一个函数** 以下是DeliverCallback接口的源代码* FunctionalInterface* public interface DeliverCallback {* void handle (String consumerTag, Delivery message) throws IOException;* }*/DeliverCallback deliverCallback (consumerTag, message) - {System.out.println(new String(message.getBody(),UTF-8));System.out.println(接收队列 queueName 绑定键 message.getEnvelope().getRoutingKey());};/*** 用信道对消息进行接收* 第一个参数消费的是哪一个队列的消息* 第二个参数消费成功后是否要自动应答true代表自动应当false代表手动应答* 第三个参数消费者接收消息后的回调方法* 第四个参数消费者取消接收消息后的回调方法正常接收不调用*/channel.basicConsume(queueName,true,deliverCallback,consumerTag -{});}}(4)新建一个名为Emit的类用于编写生产者的代码 代码如下
package com.ken.topics;import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;/*** 发消息*/
public class Emit {//声明交换机的名称public static final String EXCHANGE_NAME topic_exchange;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();MapString,String bindingKeyMap new HashMap();bindingKeyMap.put(four.one.two,被队列Q1Q2接收);bindingKeyMap.put(three.one.five,被队列Q1Q2接收);bindingKeyMap.put(four.one.six,被队列Q1接收);bindingKeyMap.put(three.seven.six,被队列Q2接收);bindingKeyMap.put(three.eight.two,虽然满足两个绑定但只被队列Q2接收一次);bindingKeyMap.put(three.seven.six,不匹配任何绑定不会被任何队列接收到会被丢弃);bindingKeyMap.put(four.one.nine.two,四个单词不匹配任何绑定会被丢弃);bindingKeyMap.put(three.one.nine.two,四个单词但匹配Q2);for (Map.EntryString, String bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey bindingKeyEntry.getKey();String message bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(UTF-8));System.out.println(生产者发出消息 message);}}}(5)分别先运行Receive01、Receive02、Emit (6)查看Receive01和Receive02接收消息的情况
从上述结果可看出topic交换机实现成功