帝国cms官方网站,怎么把dw做的网站传上去,云服务器 wordpress,虚拟主机搭建基于RabbitMQ实现RPC 前言什么是RPCRabbitMQ如何实现RPCRPC简单示例通过Spring AMQP实现RPC 前言
这边参考了RabbitMQ的官网#xff0c;想整理一篇关于RabbitMQ实现RPC调用的博客#xff0c;打算把两种实现RPC调用的都整理一下#xff0c;一个是使用官方提供的一个Java cli… 基于RabbitMQ实现RPC 前言什么是RPCRabbitMQ如何实现RPCRPC简单示例通过Spring AMQP实现RPC 前言
这边参考了RabbitMQ的官网想整理一篇关于RabbitMQ实现RPC调用的博客打算把两种实现RPC调用的都整理一下一个是使用官方提供的一个Java client还有一个是Spring AMQP的整合使用。 代码路径https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq
什么是RPC
RPC是远程过程调用Remote Procedure Call的缩写形式简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。 我们之前总结的工作模式都是发送消息到指定的队列再由相关的消费者进行消费如果存在这样的场景比如消费者消费完消息需给生产者一个具体的响应然后生产者再根据这个响应进行其他的业务逻辑这样就需要使用到RabbitMQ提供的RPC能力。
RabbitMQ如何实现RPC
官方有很详细的介绍文档这边贴一下地址https://www.rabbitmq.com/tutorials/tutorial-six-java.html RabbitMQ实现RPC很简单正常的流程就是请求以及响应我们只需要在请求的消息的属性里面添加一个响应队列的地址这边需要使用到一个BasicProperties这个类。具体配置如下
// 指定一个回调队列
callbackQueueName channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish(, rpc_queue, props, message.getBytes());BasicProperties这个类中提供了很多的属性有14个很多基本上很少用到常用的就是几个我这边也贴一下其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。
contentType 这个属性用来表明消息的类型默认是application/octet-stream这种流的类型还有常用的比如application/jsontext/plain等这些在我的RPC工具里面都有用到。replyTo 这个就是上面指定的回调队列。correlationId 这个id可以用来进行消息的确认将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的用于确认。
首先先贴一张官方提供的图这个是RabbitMQ实现RPC的主要工作流程 实现RPC的具体工作流程
首先客户端发送一个请求消息这个请求消息里面有两个属性一个是replyTo回调队列的地址一个是correlationId用于标识当前消息唯一的id信息。这个消息是发送到指定的rpc_queue这个队列上面。对应我们的服务端Server就会等待rpc_queue上面的请求消息当请求消息来得时候服务端会进行处理处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。客户端发送消息之后会等待replyTo队列中的消息。当有消息来得时候会检查响应消息中correlationId属性和请求消息中correlationId是否一致完成一次PRC调用。
RPC简单示例
我这边根据官网上面提供的例子简单修改整理了一下这边提供一个大小写转换的功能就是客户端发送一段小写的字符串服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释具体代码如下 首先服务端
/*** RPC服务端** author yuanzhihao* since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.1.108);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);Connection connection connectionFactory.newConnection();Channel channel connection.createChannel();// 定义一个rpc的队列String queueName test_rpc;channel.queueDeclare(queueName, false, false, false, null);Object monitor new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback (consumerTag, delivery) - {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response ;try {// 提供一个大小写转换的方法String message new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(toUpperCase( message ));response toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish(, delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听test_rpc队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag - { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}客户端
/*** RPC客户端** author yuanzhihao* since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(192.168.1.108);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);try ( Connection connection connectionFactory.newConnection();Channel channel connection.createChannel()) {// 声明一个队列String queueName test_rpc;// 请求消息中需要带一个唯一标识ID String corrId UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg hello rpc;// 发送请求消息channel.basicPublish(,queueName,properties,msg.getBytes());// 设置一个阻塞队列 等待服务端的响应final BlockingQueueString response new ArrayBlockingQueue(1);String ctag channel.basicConsume(replayQueueName, true, (consumerTag, message) - {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag - {});// 获取响应结果String take response.take();System.out.println(rpc result is take);channel.basicCancel(ctag);}}
}执行代码具体的客户端与服务端运行结果
通过Spring AMQP实现RPC
通过Spring来实现RPC也很简单主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。 首先是服务端我这边直接是使用配置类的形式具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~
/*** 主配置类** author yuanzhihao* since 2021/1/9*/
Configuration
public class RabbitMQConfig {private static final Logger log LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory new CachingConnectionFactory();connectionFactory.setAddresses(192.168.1.108:5672);connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);connectionFactory.setVirtualHost(/);return connectionFactory;}// 声明队列Beanpublic Queue rpcQueue() {return new Queue(test_rpc,false);}Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin new RabbitAdmin(connectionFactory);// 只有设置为 truespring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener message - {String receiveMsg new String(message.getBody(), StandardCharsets.UTF_8);log.info(Receive a message message is {}, receiveMsg);// 执行对应逻辑String responseMsg toUpperCase(receiveMsg);MessageProperties messageProperties MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send(, message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}客户端我采用test单元测试的形式
/*** spring amqp rpc 测试类** author yuanzhihao* since 2021/1/9*/
ContextConfiguration(classes {RabbitMQConfig.class})
RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log LoggerFactory.getLogger(RabbitMQConfig.class);Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端Testpublic void testRpcClient() {// 设置correlationIdString corrId UUID.randomUUID().toString();String msg hello rpc;MessageProperties messageProperties MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 amq.rabbitmq.reply-to 前缀Message message rabbitTemplate.sendAndReceive(, test_rpc, new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info(The response is {}, new String(message.getBody(), StandardCharsets.UTF_8));}
}具体实现可以看下代码的注释 代码执行结果