25个优秀个人网站设计模板,网站建设定位分析论文,叫别人做网站要注意什么,app网站开发工具下载在上一节中#xff0c;我们使用docker部署了RabbitMQ#xff0c;这一节我们将写一段生产者和消费者的代码。将用到rabbitmq的原生API来进行生产和发送消息。
一、准备工作
开始前#xff0c;我们先在RabbitMQ控制台建相好关的数据 本机的RabbitMQ部署机器是192.168.56.201…在上一节中我们使用docker部署了RabbitMQ这一节我们将写一段生产者和消费者的代码。将用到rabbitmq的原生API来进行生产和发送消息。
一、准备工作
开始前我们先在RabbitMQ控制台建相好关的数据 本机的RabbitMQ部署机器是192.168.56.201 其中控制台的地址是 http://192.168.56.201:15672/ 输入控制台的账号后可以进入 1、我们先建好一个用户 用户名hello密码world 2、再建Virtual Host:virtual01 3. 为User设置访问Virtual hosts权限 设置好后,hello用户就有virtual01的权限了
二、代码
先引入依赖由于我们后续要用springboot来写生产者消费者代码这里我们就直接引springboot的包了。如果只想用原生的客户端可以引原生的包。 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency/dependencies生产者和消费者代码如下
public class RabbitMqSimpleTest {private static final String EXCHANGE_NAME hello_exchange;private static final String QUEUE_NAME hello_queue;private static final String ROUTING_KEY hello_routing;Testpublic void send() throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.56.201);connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号connectionFactory.setUsername(hello);connectionFactory.setPassword(world);connectionFactory.setVirtualHost(virtual01);//获取TCP长连接Connection conn connectionFactory.newConnection();//创建通信“通道”相当于TCP中的虚拟连接Channel channel conn.createChannel();//创建队列,声明并创建一个队列如果队列已存在则使用这个队列//第一个参数队列名称ID//第二个参数是否持久化false对应不持久化数据MQ停掉数据就会丢失//第三个参数是否队列私有化false则代表所有消费者都可以访问true代表只有第一次拥有它的消费者才能一直使用其他消费者不让访问//第四个是否自动删除,false代表连接停掉后不自动删除掉这个队列//其他额外的参数, null//手动创建一个队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//exchange 交换机//队列名称//额外的设置属性//最后一个参数是要传递的消息字节数组channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (System.currentTimeMillis() ,hello this is my first message!).getBytes());channel.close();conn.close();System.out.println(发送成功);}Testpublic void consumer() throws IOException, TimeoutException {ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.56.201);connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号connectionFactory.setUsername(hello);connectionFactory.setPassword(world);connectionFactory.setVirtualHost(virtual01);//获取TCP长连接Connection conn connectionFactory.newConnection();//创建通信“通道”相当于TCP中的虚拟连接Channel channel conn.createChannel();//创建队列,声明并创建一个队列如果队列已存在则使用这个队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//从MQ服务器中获取数据//创建一个消息消费者//第一个参数队列名//第二个参数代表是否自动确认收到消息false代表手动编程来确认消息这是MQ的推荐做法//第三个参数要传入DefaultConsumer的实现类channel.basicConsume(QUEUE_NAME, false, new Receiver(channel));}
}消费者回调实现
public class Receiver extends DefaultConsumer {private Channel channel;//重写构造函数,Channel通道对象需要从外层传入在handleDelivery中要用到public Receiver(Channel channel) {super(channel);this.channel channel;}Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String message new String(body);System.out.println(消费者接收到的消息message);System.out.println(消息的TagIdenvelope.getDeliveryTag());//false只确认签收当前的消息设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(), false);}
}运行一下send发送消息成功了。 去控制台后台看一下 队列成功创建好了 消息发送成功了有一条待消费的消息在队列里面 可以在这里查看到刚才发送的消息内容 在这里可以看到queue和exchange的绑定关系 控制台还有很多有意思的功能大家可以下来尝试一下。 同时启动消费者也能成功消费