asp网站木马扫描,巴彦淖尔网站制作开发,网络营销典型案例有哪些,网站建设定制开发网站设计开发一、基本概念 RabbitMQ 是一个开源的AMQP实现#xff0c;服务器端用Erlang语言编写#xff0c;支持多种客户端#xff0c;如#xff1a;Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等#xff0c;支持AJAX。用于在分布式系统中存储转发消息#xf…一、基本概念 RabbitMQ 是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面访问地址 账号密码均为 guest。
rabbitmq-plugins enable rabbitmq_management
二、用户
超级管理员(administrator)可登陆管理控制台可查看所有的信息并且可以对用户策略(policy)进行操作。监控者(monitoring)可登陆管理控制台同时可以查看rabbitmq节点的相关信息(进程数内存使用情况磁盘使用情况等)策略制定者(policymaker)可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。普通管理者(management)仅可登陆管理控制台无法看到节点信息也无法对策略进行管理。其他无法登陆管理控制台通常就是普通的生产者和消费者。
三、工作模式 RabbitMQ主要有五种工作模式分别是
简单模式hello world工作队列模式work queue发布/订阅模式publish/subscribe路由模式routing主题模式topic导入依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion3.4.1/version
/dependency工具类
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(localhost);//端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(vhost);factory.setUsername(guest);factory.setPassword(guest);// 通过工厂获取连接Connection connection factory.newConnection();return connection;}
} 1.简单模式hello world
//发送信息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();// 从连接中创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(hello, false, false, false, null);// 消息内容String message Hello World!;channel.basicPublish(, hello, null, message.getBytes());//关闭通道和连接channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();// 从连接中创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(hello, false, false, false, null);// 定义队列的消费者QueueingConsumer consumer new QueueingConsumer(channel);// 监听队列channel.basicConsume(hello, true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());}
} 2.工作队列模式work queue多个消费者消费同一队列消息。
//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明队列channel.queueDeclare(hello, false, false, false, null);// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer new QueueingConsumer(channel);// 监听队列false表示手动返回完成状态true表示接收到消息马上自动确认完成channel.basicConsume(hello, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());// 返回确认状态否则表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
} 3.发布/订阅模式publish/subscribe通过交换机发送消息到多个队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 消息内容String message Hello World!;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer new QueueingConsumer(channel);// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
} 4.路由模式routing通过交换机进行路由匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, direct);// 消息内容String message Hello World!;//指定消息路由channel.basicPublish(EXCHANGE_NAME, routing, null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机并指定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routing1);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routing2);// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer new QueueingConsumer(channel);// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
} 5.主题模式topic通过交换机进行通配符匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, topic);// 消息内容String message Hello World!;//指定消息匹配关键字channel.basicPublish(EXCHANGE_NAME, topic, null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机并指定多个通配符channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, topic1.*);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, topic2.*);// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer new QueueingConsumer(channel);// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery consumer.nextDelivery();String message new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
四、Spring整合 Spring 提供了 RabbitTemplate 类执行消息发送。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
spring:rabbitmq:host: 192.168.88.88port: 5672username: guestpassword: guestvirtual-host: /
Configuration
public class MQConfig {Beanpublic Exchange exchange1(){return ExchangeBuilder.fanoutExchange(fanout).build();}Beanpublic Exchange exchange2(){return ExchangeBuilder.directExchange(direct).build();}Beanpublic Queue queue1(){return QueueBuilder.durable(hello1).build();}Beanpublic Queue queue2(){return QueueBuilder.durable(hello2).build();}Beanpublic Binding binding1(Exchange exchange1,Queue queue1){return BindingBuilder.bind(queue1).to(exchange1).with(key1).noargs();}Beanpublic Binding binding2(Exchange exchange2,Queue queue2){return BindingBuilder.bind(queue2).to(exchange2).with(key2).noargs();}
}
Component
//定义队列并绑定
RabbitListener(bindings QueueBinding(value Queue(value hello, durable true, autoDelete true),exchange Exchange(value fanout, type ExchangeTypes.FANOUT), key key), ackMode MANUAL)
public class MyListener {RabbitHandlerpublic void consume(Message message, Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {//手动返回状态if () {// RabbitMQ的ack机制中第二个参数返回true表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}}
}