网站栏目名称,wordpress统计和谷歌不同,网站更改了资料 百度什么时侯来抓取,photoshop安卓版SpringAMQT RabbitMQ安装与部署RabbitMQ结构简单队列模型 SpringAMQP依赖引入配置RabbitMQ连接信息基本模型简单队列模型WorkQueue模型 发布订阅模型FanoutExchangeDirectExchangeTopicExchange 消息转换器 消息队列是实现异步通讯的一种方式#xff0c;我们将从RabbitMQ为例开… SpringAMQT RabbitMQ安装与部署RabbitMQ结构简单队列模型 SpringAMQP依赖引入配置RabbitMQ连接信息基本模型简单队列模型WorkQueue模型 发布订阅模型FanoutExchangeDirectExchangeTopicExchange 消息转换器 消息队列是实现异步通讯的一种方式我们将从RabbitMQ为例开始介绍SpringAMQT。
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件。
安装与部署
由于RabbitMQ运行需要安装Erlang为了方便部署我们采用docker的方式来部署RabbitMq
首先拉取RabbitMQ的镜像带有management的Tag的说明该镜像含有Web控制台
docker pull rabbitmq:3-management执行下面的命令来运行RabbitMQ容器
docker run \
-e RABBITMQ_DEFAULT_USERusername \
-e RABBITMQ_DEFAULT_PASSpassword \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-managementusername和password是进入RabbitMQ控制台时使用的账号密码15672端口是控制台所占用的端口5672是MQ服务所占用的端口。
RabbitMQ结构
channel: 操作MQ的工具exchange: 路由消息到队列中queue 缓存消息virtual host:虚拟主机是对queue、exchange等资源的逻辑分组
简单队列模型
消息发布者
SpringBootTest
class PublisherApplicationTests {Testvoid publisher() throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.111.135);factory.setPort(5672);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(/);Connection connectionfactory.newConnection();//创建通道Channel channelconnection.createChannel();//创建队列String queueNamesimple.queue;channel.queueDeclare(queueName,false,false,false,null);//发布消息String messagehello,rabbitmq!;for (int i 0; i 100; i) {channel.basicPublish(,queueName,null,message.getBytes());}channel.close();connection.close();}消息消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.111.135);factory.setPort(5672);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(/);Connection connectionfactory.newConnection();//建立通道Channel channelconnection.createChannel();//消费端也创建队列是为了防止消费端先启动找不到队列String queueNamesimple.queue;channel.queueDeclare(queueName,false,false,false,null);//为通道绑定消费者channel.basicConsume(queueName,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(消息已被处理);}});}
}其他的模型将在SpringAMPQ中进行介绍
SpringAMQP
AMQP (Adavance Message Queuing Protocol 高级消息队列协议)是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关更符合微服务中独立性的要求。
Spring AMQP 是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。包含两个部分其中spring-amqp是基础抽象spring-rabbit是底层的默认实现
SpringAMQP的特点
提供监听容器用于异步处理入站消息提供RabbitTemplate用于发送和接收消息提供RabbitAdmin来自动声明队列交换机和绑定
依赖引入
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency配置RabbitMQ连接信息
spring:rabbitmq:host: 192.168.111.135port: 5672username: usernamepassword: passwordvirtual-host: /基本模型
简单队列模型
消息发布端
SpringBootTest
class PublisherApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid publisher(){String queueNamesimple.queue;String messagehello,rabbitmq!;rabbitTemplate.convertSendAndReceive(queueName,message);}}消息消费端
Component
public class RabbitListenerTest {RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String message){System.out.println(message);}
}WorkQueue模型
模型特点多个消费者绑定到一个队列同一条消息只会被一个消费者处理。
在默认情况下消费者会进行消息预取预取的数量为无限大这会导致性能不同的消费者处理相同数量的消息可以通过设置prefetch来控制消费者预取的消息数量
spring:rabbitmq:listener:simple:prefetch: 1发布订阅模型
发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)
常见的exchange类型有:
Fanout: 广播Direct: 路由Topic: 话题
exchange只负责消息路由而不进行存储路由失败则消息丢失
发布订阅模型分三部
在consumer服务中声明Exchange、Queue、Binding在consumer服务中声明多个消费者在publisher服务发送消息到Exchange
FanoutExchange
声明Exchange、Queue、Binding。除了通过在配置类中通过Bean注解绑定队列和交换机外还可以在RabbitListener注解中绑定第二种绑定方式将在下一部分使用
Configuration
public class FanoutConfig {Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(simple.fanout);}Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}Beanpublic Binding bingingQueue1(Queue fanoutQueue1,FanoutExchange exchange){return BindingBuilder.bind(fanoutQueue1).to(exchange);}Beanpublic Binding bingingQueue2(Queue fanoutQueue2,FanoutExchange exchange){return BindingBuilder.bind(fanoutQueue2).to(exchange);}
}声明消费者与简单模型没什么差别就不赘述了。
publisher端将消息发送给交换机有一点小区别
SpringBootTest
class PublisherApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid publisher(){String exchangeNamesimple.fanout;String messagehello,rabbitmq!;//三个参数分别为交换机名、routingkey和消息rabbitTemplate.convertSendAndReceive(exchangeName,,message);}}DirectExchange
在上一种交换机中发送消息的第二个参数routingkey我们设置为了空实际上每一个Queue与Exchange间都可以设定多个bindingkey通过routingkey参数交换机将把消息路由到与其匹配的队列中。
Component
public class RabbitListenerTest {RabbitListener(bindings QueueBinding(value Queue(name simple.queue1),exchange Exchange(name simple.direct,type ExchangeTypes.DIRECT),key {red,blue}))public void listenSimpleQueueMessage(String message){System.out.println(message);}
}
TopicExchange
TopicExchange与DirectExchange类似区别是
TopicExchange的routingkey必须是多个单词的列表并且以英文句号.分隔。bindingkey支持使用通配符#匹配零或多个单词*匹配一个单词。
通过以下代码你很容易可以发现它的特点
SpringBootTest
class PublisherApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid publisher(){String exchangeNamesimple.fanout;String messagehello,rabbitmq!;rabbitTemplate.convertSendAndReceive(exchangeName,china.weather,message);}}Component
public class RabbitListenerTest {RabbitListener(bindings QueueBinding(value Queue(name simple.queue1),exchange Exchange(name simple.direct,type ExchangeTypes.Topic),key china.*))public void listenSimpleQueueMessage(String message){System.out.println(message);}
}消息转换器
我们在使用RabbitTemplate的时候可以发现它所发送的消息的类型为Object这意味着它可以发送所有对象。默认它所使用的是jdk的序列化这样的效率较低。
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的只需定义一个MessageConverter类型的Bean即可修改序列化方式。
以jackson的序列化为例
引入jackson的依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency定义MessageConverter类型的Bean
Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}