网站未备案 打不开,保定软件开发公司,营销技巧电影,hao123浏览器官方下载✨✨谢谢大家捧场#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右#xff0c;一定要天天开心哦#xff01;✨✨ #x1f388;#x1f388;作者主页#xff1a; 喔的嘛呀#x1f388;#x1f388; ✨✨ 帅哥美女们#xff0c;我们共同加油#xff01;一起进步祝屏幕前的小伙伴们每天都有好运相伴左右一定要天天开心哦✨✨ 作者主页 喔的嘛呀 ✨✨ 帅哥美女们我们共同加油一起进步✨✨ 目录
引言
一. 选择合适的消息中间件
二. 定义消息格式和通信协议
1. 定义消息格式
消息头
消息体
2. 定义通信协议
发送消息
接收消息
消息处理
3. 示例代码
定义消息格式
发送消息
接收消息
三、发布-订阅模式
1. 定义发布-订阅模式
2. 示例代码
发布消息
订阅消息
3. 运行示例
4. 异步处理消息
5. 解耦系统
6. 实现步骤
7. 实例场景
实例场景电商系统订单处理
场景描述
实现步骤
示例代码
订单服务发送消息
库存服务接收消息
物流服务接收消息 引言
在现代分布式系统中异步通信和解耦是非常重要的设计原则。通过使用消息中间件可以实现系统间的异步通信和解耦提高系统的可扩展性和可靠性。本文将介绍如何使用消息中间件来实现系统间的异步通信和解耦并通过一个实际场景来演示。
一. 选择合适的消息中间件
选择合适的消息中间件需要考虑多个因素包括项目需求、性能要求、可靠性、社区支持等。常见的消息中间件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等下面针对不同的需求给出一些选择建议 消息传递模式 点对点适合使用 RabbitMQ、ActiveMQ 等传统消息中间件。发布-订阅适合使用 RabbitMQ、Kafka 等支持广播消息的中间件。 可靠性 如果对消息的可靠性要求较高需要确保消息不会丢失可以考虑使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中间件。 性能 如果需要处理大量的消息并且需要低延迟可以考虑使用 Kafka它是一个高吞吐量的消息中间件适合大数据场景。如果对延迟要求较低可以选择 RabbitMQ、ActiveMQ 等传统消息中间件。 社区支持和生态系统 考虑选择一个有活跃社区支持和完善生态系统的消息中间件这样可以更容易地解决问题和扩展功能。 技术栈兼容性 考虑选择一个与你的技术栈兼容的消息中间件避免出现集成上的问题。
综合考虑以上因素可以选择最适合项目需求的消息中间件。
二. 定义消息格式和通信协议
定义消息格式和通信协议是使用消息中间件的关键步骤之一它涉及到消息的结构、内容和交互方式。下面以 RabbitMQ 为例演示如何定义消息格式和通信协议。
1. 定义消息格式
在 RabbitMQ 中消息通常由两部分组成消息头和消息体。消息头包含一些元数据信息如消息的类型、路由键等消息体包含实际的业务数据。
消息头
Content-Type消息体的类型如 application/json、text/plain 等。DeliveryMode消息持久性标志标识消息是否需要持久化存储可选值为 1持久化和 2非持久化。CorrelationId消息关联标识用于关联一组相关消息。其他自定义的消息头字段根据业务需求定义。
消息体
消息体可以是任意格式的数据如 JSON、XML、文本等根据业务需求定义。
2. 定义通信协议
通信协议定义了消息的交互方式包括消息的发送、接收和处理流程。通信协议可以包括以下几个方面
发送消息
客户端向消息队列发送消息包括指定交换机Exchange、路由键Routing Key和消息体。
接收消息
服务端从消息队列接收消息根据消息的交换机和路由键接收对应的消息。
消息处理
客户端接收到消息后根据消息的内容执行相应的业务逻辑。
3. 示例代码
定义消息格式
public class Message {private String content;private String contentType;private int deliveryMode;private String correlationId;// 省略getter和setter方法
}发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class SendMessage {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Message message new Message();message.setContent(Hello, RabbitMQ!);message.setContentType(text/plain);message.setDeliveryMode(1); // 持久化message.setCorrelationId(123456);String messageJson toJson(message);channel.basicPublish(, QUEUE_NAME, null, messageJson.getBytes());System.out.println( [x] Sent messageJson );}}private static String toJson(Message message) {// 将 message 对象转换成 JSON 格式的字符串return { \content\: \ message.getContent() \, \contentType\: \ message.getContentType() \, \deliveryMode\: message.getDeliveryMode() , \correlationId\: \ message.getCorrelationId() \ };}
}接收消息
import com.rabbitmq.client.*;public class ReceiveMessage {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CtrlC);DeliverCallback deliverCallback (consumerTag, delivery) - {String messageJson new String(delivery.getBody(), UTF-8);Message message fromJson(messageJson, Message.class);System.out.println( [x] Received messageJson );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}}private static T T fromJson(String json, ClassT clazz) {// 将 JSON 格式的字符串转换成指定类型的对象// 这里可以使用 JSON 框架如 Jackson、Gson来实现return null;}
}通过以上步骤可以定义消息格式和通信协议并使用 RabbitMQ 实现消息的发送和接收。
三、发布-订阅模式
发布-订阅模式是一种常见的消息传递模式用于实现消息的广播和订阅。在发布-订阅模式中消息发布者将消息发布到一个主题Topic而消息订阅者可以订阅感兴趣的主题从而接收到相关消息。下面以 RabbitMQ 为例演示如何使用发布-订阅模式。
1. 定义发布-订阅模式
在发布-订阅模式中有一个交换机Exchange用来接收发布者发布的消息并根据订阅者的绑定关系将消息路由到对应的队列。订阅者可以创建自己的队列并将队列绑定到交换机上从而接收到发布者发布的消息。
2. 示例代码
发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);String message Hello, subscribers!;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [x] Sent message );}}
}订阅消息
import com.rabbitmq.client.*;public class Subscriber {private final static String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages. To exit press CtrlC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - {});}}
}3. 运行示例
先运行订阅者 Subscriber它会创建一个队列并绑定到交换机上开始监听消息。然后运行发布者 Publisher它会向交换机发布一条消息。订阅者会接收到发布者发布的消息并输出到控制台。
通过以上步骤可以实现基于 RabbitMQ 的发布-订阅模式。
4. 异步处理消息
通过消息中间件实现异步处理消息即发送消息后不需要立即等待结果而是继续执行其他任务。这样可以提高系统的响应速度和吞吐量。
5. 解耦系统
通过消息中间件系统之间的通信变成了基于消息的方式系统不再直接依赖于对方的接口和实现细节从而实现了系统之间的解耦。
6. 实现步骤
定义消息格式和通信协议确定消息的格式和通信协议包括消息的内容结构、消息的生命周期等。配置消息中间件在系统中配置和启动消息中间件确保消息中间件正常运行。消息的发布和订阅编写代码实现消息的发布和订阅逻辑将消息发布到指定的主题并订阅感兴趣的主题。处理接收到的消息编写代码处理接收到的消息根据消息的内容执行相应的业务逻辑。测试和验证对系统进行测试和验证确保消息的发布、订阅和处理功能正常运行。
7. 实例场景
实例场景电商系统订单处理
场景描述
假设有一个电商系统包含订单服务、库存服务和物流服务。当用户下单时订单服务需要通知库存服务减少库存通知物流服务发货。为了提高系统的可扩展性和可靠性我们可以使用消息中间件来实现订单处理的异步通信和解耦。
实现步骤 定义消息格式和通信协议定义订单消息的格式包括订单号、商品信息等并确定消息的交换机和队列名称。 配置消息中间件在消息中间件中配置交换机和队列并确保消息的持久化。 订单服务发送消息订单服务在用户下单后将订单消息发送到消息队列中。 库存服务订阅消息库存服务订阅订单消息队列接收并处理订单消息减少库存。 物流服务订阅消息物流服务也订阅订单消息队列接收并处理订单消息进行发货。
示例代码
订单服务发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class OrderService {private static final String EXCHANGE_NAME orders;private static final String QUEUE_NAME order_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message New order placed;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [x] Sent message );}}
}库存服务接收消息
import com.rabbitmq.client.*;public class InventoryService {private static final String EXCHANGE_NAME orders;private static final String QUEUE_NAME order_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );System.out.println( [*] Waiting for orders. To exit press CtrlC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );// 处理订单消息减少库存};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}}
}物流服务接收消息
import com.rabbitmq.client.*;public class LogisticsService {private static final String EXCHANGE_NAME orders;private static final String QUEUE_NAME order_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection(); Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );System.out.println( [*] Waiting for orders. To exit press CtrlC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );// 处理订单消息发货};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}}
}通过以上步骤的简单演示订单服务可以异步发送订单消息库存服务和物流服务可以订阅订单消息并处理实现了订单处理的异步通信和解耦。 通过以上步骤可以使用消息中间件实现系统间的异步通信和解耦提高系统的可扩展性和可维护性。