重庆渝网站建设,百度网盘app官网下载,logo设计一键生成,旅游最新资讯发布订阅模式
一个消息可以由多个消费者消费同一个消息 消费者1和2同时消费了该消息
举例 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq …发布订阅模式
一个消息可以由多个消费者消费同一个消息 消费者1和2同时消费了该消息
举例 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);/*** 发布订阅模式需要指定交换机和类型,不能用上面的模式* 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定* 或者没有符合路由规则的队列,那么消息会丢失* 第一个参数:交换机名字* 第二个参数:交换机类型* fanout:广播,将消息交给所有绑定到交换机的队列* direct:定向,把消息交给符合指定routing key的队列* topic:通配符,把消息交给符合routing pattern(路由模式)的队列*/channel.exchangeDeclare(03-pubsub1, fanout);//6 发送消息/*** 第一个参数:交换机名称 没有交换机就设置* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish(03-pubsub1,, MessageProperties.PERSISTENT_TEXT_PLAIN, hello rabbitMQ.getBytes());//7 关闭消息//channel.close();connection.close();
} 消费者1和2同时消费了该消息,比如说消息是发短信,发邮件, 那么1和发短息 2可以发邮件 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);//channel.basicQos(1);//6 使用chanel 去 rabbitmq 获取消息进行消费/*** 第一个参数:队列的名称* 第二个参数:是否自动签收* 第三个参数:消息属性* 第四个参数:消息内容*/channel.exchangeDeclare(03-pubsub1, fanout);//绑定String queue channel.queueDeclare().getQueue();channel.queueBind(queue, 03-pubsub1, );channel.basicConsume(queue, false,new DeliverCallback(){/*** 当消息从mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中进行处理*/Overridepublic void handle(String s, Delivery delivery){System.out.println(消费者 1 消息中的内容为:new String(delivery.getBody()));}},new CancelCallback(){/*** 当消息取消了会回调这个方法*/Overridepublic void handle(String s) throws IOException {System.out.println(111);}});//7 关闭消息 注意消费者 需要持续监听,不要关闭//channel.close();//connection.close();
} routing路由模式
就是说哪些让谁干
哪些让谁干区分出来
也可以让所有消费者都消费
选择性的让某个消费者消费,或者都消费 生产者
public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);/*** 发布订阅模式需要指定交换机和类型,不能用上面的模式* 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定* 或者没有符合路由规则的队列,那么消息会丢失* 第一个参数:交换机名字* 第二个参数:交换机类型* fanout:广播,将消息交给所有绑定到交换机的队列* direct:定向,把消息交给符合指定routing key的队列* topic:通配符,把消息交给符合routing pattern(路由模式)的队列*/channel.exchangeDeclare(04-routing1, direct);//6 发送消息/*** 第一个参数:交换机名称 没有交换机就设置* 第二个参数:路由key routing模式需要路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish(04-routing1,info, MessageProperties.PERSISTENT_TEXT_PLAIN, hello rabbitMQ.getBytes());//7 关闭消息//channel.close();connection.close();
} 消费者1 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);//channel.basicQos(1);//6 使用chanel 去 rabbitmq 获取消息进行消费/*** 第一个参数:队列的名称* 第二个参数:是否自动签收* 第三个参数:消息属性* 第四个参数:消息内容*/channel.exchangeDeclare(04-routing1, direct);//绑定String queue channel.queueDeclare().getQueue();//可与绑定多个channel.queueBind(queue, 04-routing1, info);channel.queueBind(queue, 04-routing1, error);channel.queueBind(queue, 04-routing1, waring);channel.basicConsume(queue, true,new DeliverCallback(){/*** 当消息从mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中进行处理*/Overridepublic void handle(String s, Delivery delivery){System.out.println(消费者 1 消息中的内容为:new String(delivery.getBody()));}},new CancelCallback(){/*** 当消息取消了会回调这个方法*/Overridepublic void handle(String s) throws IOException {System.out.println(111);}});//7 关闭消息 注意消费者 需要持续监听,不要关闭//channel.close();//connection.close();
} 消费者2 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);//channel.basicQos(1);//6 使用chanel 去 rabbitmq 获取消息进行消费/*** 第一个参数:队列的名称* 第二个参数:是否自动签收* 第三个参数:消息属性* 第四个参数:消息内容*/channel.exchangeDeclare(04-routing1, direct);//绑定String queue channel.queueDeclare().getQueue();//可与绑定多个channel.queueBind(queue, 04-routing1, trace);channel.basicConsume(queue, true,new DeliverCallback(){/*** 当消息从mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中进行处理*/Overridepublic void handle(String s, Delivery delivery){System.out.println(消费者 2 消息中的内容为:new String(delivery.getBody()));}},new CancelCallback(){/*** 当消息取消了会回调这个方法*/Overridepublic void handle(String s) throws IOException {System.out.println(111);}});//7 关闭消息 注意消费者 需要持续监听,不要关闭//channel.close();//connection.close();
} 上面的只有消费者1消费了消息
可以根据channel.queueBind(queue, 04-routing1, trace); 绑定消息 也可以让1和2都消费, topic模式和Routing模式高度相识,用通配符的形式指定让谁消费,或者都消费 生产者 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);/*** 发布订阅模式需要指定交换机和类型,不能用上面的模式* 交换机 Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定* 或者没有符合路由规则的队列,那么消息会丢失* 第一个参数:交换机名字* 第二个参数:交换机类型* fanout:广播,将消息交给所有绑定到交换机的队列* direct:定向,把消息交给符合指定routing key的队列* topic:通配符,把消息交给符合routing pattern(路由模式)的队列*/channel.exchangeDeclare(05-topic1, topic);//6 发送消息/*** 第一个参数:交换机名称 没有交换机就设置* 第二个参数:路由key routing模式需要路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish(05-topic1,employee.save, MessageProperties.PERSISTENT_TEXT_PLAIN, hello rabbitMQ.getBytes());//7 关闭消息//channel.close();connection.close();
}
消费者1 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);//channel.basicQos(1);//6 使用chanel 去 rabbitmq 获取消息进行消费/*** 第一个参数:队列的名称* 第二个参数:是否自动签收* 第三个参数:消息属性* 第四个参数:消息内容*/channel.exchangeDeclare(05-topic1, topic);//绑定String queue channel.queueDeclare().getQueue();//可与绑定多个channel.queueBind(queue, 05-topic1, employee.*);channel.basicConsume(queue, true,new DeliverCallback(){/*** 当消息从mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中进行处理*/Overridepublic void handle(String s, Delivery delivery){System.out.println(消费者 1 消息中的内容为:new String(delivery.getBody()));}},new CancelCallback(){/*** 当消息取消了会回调这个方法*/Overridepublic void handle(String s) throws IOException {System.out.println(111);}});//7 关闭消息 注意消费者 需要持续监听,不要关闭//channel.close();//connection.close();
} 消费者2 public static void main(String[] args) throws IOException, TimeoutException {//1 创建连接工厂ConnectionFactory connectionFactorynew ConnectionFactory();//2 设置rabbitmq ip地址connectionFactory.setHost(localhost);//3 创建连接对象 Conection对象Connection connectionconnectionFactory.newConnection();//4 创建管道 ChanelChannel channelconnection.createChannel();//5 设置队列属性/*** 第一个参数:队列的名称* 第二个参数:队列是否要持久化* 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的)* 第四个参数:是否自动删除消息* 第五个参数:是否要设置一些额外的参数*///channel.queueDeclare(02-work,false,false,true,null);//channel.basicQos(1);//6 使用chanel 去 rabbitmq 获取消息进行消费/*** 第一个参数:队列的名称* 第二个参数:是否自动签收* 第三个参数:消息属性* 第四个参数:消息内容*/channel.exchangeDeclare(05-topic1, topic);//绑定String queue channel.queueDeclare().getQueue();//可与绑定多个channel.queueBind(queue, 05-topic1, user.*);channel.basicConsume(queue, true,new DeliverCallback(){/*** 当消息从mq 中取出来了会回调这个方法* 消费者消费消息就在这个 handle中进行处理*/Overridepublic void handle(String s, Delivery delivery){System.out.println(消费者 2 消息中的内容为:new String(delivery.getBody()));}},new CancelCallback(){/*** 当消息取消了会回调这个方法*/Overridepublic void handle(String s) throws IOException {System.out.println(111);}});//7 关闭消息 注意消费者 需要持续监听,不要关闭//channel.close();//connection.close();
} 结果就是消费者1消费了消息 所有工作模式总结