产品 网站建设,企业征信查询官网入口,杜桥做网站哪家好,学网站建设要学什么在现代分布式系统中#xff0c;管理跨多个服务的长事务至关重要。传统的分布式事务解决方案往往面临性能瓶颈和复杂性问题#xff0c;而 Saga 模式 作为一种灵活高效的解决方案#xff0c;逐渐受到开发者的青睐。本文将探讨如何利用 Spring Boot 和 Kafka 实现 Saga 模式管理跨多个服务的长事务至关重要。传统的分布式事务解决方案往往面临性能瓶颈和复杂性问题而 Saga 模式 作为一种灵活高效的解决方案逐渐受到开发者的青睐。本文将探讨如何利用 Spring Boot 和 Kafka 实现 Saga 模式并详细介绍事务补偿机制帮助你构建稳定可靠的分布式系统。
什么是 Saga 模式
原理介绍
在微服务架构中一个业务流程通常涉及多个独立的服务。这些服务必须协同工作以完成完整的业务操作。例如用户下单可能需要订单服务、支付服务和库存服务的合作。然而跨服务操作通常涉及复杂的事务管理传统的分布式事务如两阶段提交不仅效率低下还难以扩展和维护。
Saga 模式 提供了一种替代方案通过将一个长事务分解为一系列的本地事务并通过事件或命令进行协调从而实现最终一致性。这种方法不仅提高了系统的可扩展性还简化了事务管理。
解决的问题及其重要性
Saga 模式解决了以下问题
分布式事务管理通过拆分事务避免了传统分布式事务的性能和复杂性问题。系统可扩展性各服务独立运行易于扩展和维护。错误恢复通过补偿机制确保在步骤失败时系统能恢复到一致状态。
在现代微服务架构中确保跨服务操作的可靠性和一致性至关重要。Saga 模式提供了一个高效且灵活的解决方案使系统在面对复杂业务流程和潜在错误时能够稳定运行。
Saga 模式的组成部分与实现方法
Saga 模式主要有两种实现方式Choreography编排 和 Orchestration指挥。下面将详细介绍这两种模式并展示如何使用 Spring Boot 和 Kafka 实现它们包括事务补偿机制。
架构图
编排模式
---------------- ---------------- ----------------
| OrderService | | PaymentService | | InventoryService|
---------------- ---------------- ----------------| | || CreateOrderCommand | ||-------------------------| || | || OrderCreatedEvent | ||-------------------------| || | || | PaymentCommand || |------------------------|| | || | PaymentProcessedEvent || |------------------------|| | || InventoryCommand | ||-------------------------| || | || | InventoryUpdatedEvent||-------------------------| || | |指挥模式
---------------- ---------------- ----------------
| SagaOrchestrator| | OrderService | | PaymentService |
---------------- ---------------- ----------------| | || CreateOrderCommand | ||-----------------------| || | || OrderCreatedEvent | ||-----------------------| || | || PaymentCommand | ||---------------------------------------------------|| | || PaymentApprovedEvent | ||---------------------------------------------------|| | || | || InventoryCommand | ||-----------------------| || | |1. 编排模式
在 编排模式 下各服务通过事件进行通信和协调没有中央控制器。每个服务独立地监听和发布事件以完成整个业务流程。
1.1 组成部分
事件定义服务之间传递的消息如 OrderCreatedEvent、PaymentProcessedEvent 等。Kafka 生产者与消费者用于事件的发布和订阅。各服务逻辑根据收到的事件执行相应的操作并发布下一个事件。
1.2 实现步骤与代码
1.2.1 定义事件
// OrderCreatedEvent.java
public class OrderCreatedEvent {private String orderId;private String product;private int quantity;// getters and setters
}// PaymentProcessedEvent.java
public class PaymentProcessedEvent {private String orderId;private boolean success;// getters and setters
}// PaymentFailedEvent.java
public class PaymentFailedEvent {private String orderId;private String reason;// getters and setters
}// OrderCancelledEvent.java
public class OrderCancelledEvent {private String orderId;// getters and setters
}1.2.2 配置 Kafka
# application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: saga-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer1.2.3 实现 OrderService
Service
public class OrderService {Autowiredprivate KafkaTemplateString, String kafkaTemplate;private ObjectMapper mapper new ObjectMapper();KafkaListener(topics create-order-command, groupId order-service-group)public void handleCreateOrder(String message) throws JsonProcessingException {CreateOrderCommand command mapper.readValue(message, CreateOrderCommand.class);// 创建订单逻辑// TODO: 保存订单到数据库// 发布 OrderCreatedEventOrderCreatedEvent event new OrderCreatedEvent();event.setOrderId(command.getOrderId());event.setProduct(command.getProduct());event.setQuantity(command.getQuantity());String eventMsg mapper.writeValueAsString(event);kafkaTemplate.send(order-created, eventMsg);}KafkaListener(topics payment-failed, groupId saga-group)public void handlePaymentFailed(String message) throws JsonProcessingException {PaymentFailedEvent failedEvent mapper.readValue(message, PaymentFailedEvent.class);// 取消订单逻辑cancelOrder(failedEvent.getOrderId());// 发布 OrderCancelledEventOrderCancelledEvent cancelledEvent new OrderCancelledEvent();cancelledEvent.setOrderId(failedEvent.getOrderId());String cancelledMsg mapper.writeValueAsString(cancelledEvent);kafkaTemplate.send(order-cancelled, cancelledMsg);}private void cancelOrder(String orderId) {// TODO: 取消订单逻辑}
}1.2.4 实现 PaymentService
Service
public class PaymentService {Autowiredprivate KafkaTemplateString, String kafkaTemplate;private ObjectMapper mapper new ObjectMapper();KafkaListener(topics order-created, groupId saga-group)public void handleOrderCreated(String message) throws JsonProcessingException {OrderCreatedEvent event mapper.readValue(message, OrderCreatedEvent.class);// 处理支付逻辑boolean success processPayment(event.getOrderId(), event.getQuantity());if (success) {// 发布 PaymentProcessedEventPaymentProcessedEvent paymentEvent new PaymentProcessedEvent(event.getOrderId(), true);String paymentMsg mapper.writeValueAsString(paymentEvent);kafkaTemplate.send(payment-processed, paymentMsg);} else {// 发布 PaymentFailedEventPaymentFailedEvent failedEvent new PaymentFailedEvent();failedEvent.setOrderId(event.getOrderId());failedEvent.setReason(Payment processing failed.);String failedMsg mapper.writeValueAsString(failedEvent);kafkaTemplate.send(payment-failed, failedMsg);}}private boolean processPayment(String orderId, int quantity) {// TODO: 实现支付逻辑模拟支付失败return false;}
}1.2.5 实现 InventoryService
Service
public class InventoryService {KafkaListener(topics order-cancelled, groupId saga-group)public void handleOrderCancelled(String message) throws JsonProcessingException {OrderCancelledEvent cancelledEvent new ObjectMapper().readValue(message, OrderCancelledEvent.class);// 回滚库存逻辑rollbackInventory(cancelledEvent.getOrderId());}private void rollbackInventory(String orderId) {// TODO: 实现库存回滚逻辑}
}1.3 事务补偿机制
在 编排模式 中当某个服务的操作失败时需要通过发布补偿事件来反向撤销之前的操作。上述代码中PaymentService 在支付失败时发布 PaymentFailedEventOrderService 监听该事件并执行订单取消逻辑随后发布 OrderCancelledEvent最后 InventoryService 监听并回滚库存。
2. 指挥模式
在 指挥模式 下存在一个中央的 Saga 管理器Orchestrator负责调度和协调各个服务的操作并在需要时触发补偿机制。
2.1 组成部分
Saga Orchestrator负责整个事务流程的控制和协调。命令和事件定义如 CreateOrderCommand、PaymentCommand 等。Kafka 生产者与消费者用于命令和事件的发布与订阅。各服务逻辑根据接收到的命令执行操作并发布相应的事件。
2.2 实现步骤与代码
2.2.1 定义命令和事件
// CreateOrderCommand.java
public class CreateOrderCommand {private String orderId;private String product;private int quantity;// getters and setters
}// OrderCreatedEvent.java
public class OrderCreatedEvent {private String orderId;// getters and setters
}// PaymentCommand.java
public class PaymentCommand {private String orderId;private double amount;// getters and setters
}// PaymentApprovedEvent.java
public class PaymentApprovedEvent {private String orderId;// getters and setters
}// PaymentRejectedEvent.java
public class PaymentRejectedEvent {private String orderId;private String reason;// getters and setters
}// CancelOrderCommand.java
public class CancelOrderCommand {private String orderId;// getters and setters
}// OrderCancelledEvent.java
public class OrderCancelledEvent {private String orderId;// getters and setters
}2.2.2 实现 SagaOrchestrator
Service
public class SagaOrchestrator {Autowiredprivate KafkaTemplateString, String kafkaTemplate;private ObjectMapper mapper new ObjectMapper();KafkaListener(topics order-created, groupId saga-orchestrator-group)public void handleOrderCreated(String message) throws JsonProcessingException {OrderCreatedEvent event mapper.readValue(message, OrderCreatedEvent.class);try {// 发送 PaymentCommandPaymentCommand paymentCommand new PaymentCommand();paymentCommand.setOrderId(event.getOrderId());paymentCommand.setAmount(calculateAmount(event.getOrderId()));String paymentCmd mapper.writeValueAsString(paymentCommand);kafkaTemplate.send(payment-command, paymentCmd);} catch (Exception e) {// 发送补偿命令sendCancelOrderCommand(event.getOrderId());}}KafkaListener(topics payment-approved, groupId saga-orchestrator-group)public void handlePaymentApproved(String message) throws JsonProcessingException {PaymentApprovedEvent event mapper.readValue(message, PaymentApprovedEvent.class);// 继续后续操作如库存更新// TODO: 发送其他命令或处理逻辑}KafkaListener(topics payment-rejected, groupId saga-orchestrator-group)public void handlePaymentRejected(String message) throws JsonProcessingException {PaymentRejectedEvent event mapper.readValue(message, PaymentRejectedEvent.class);// 发送补偿命令sendCancelOrderCommand(event.getOrderId());}public void startSaga(Order order) throws JsonProcessingException {// 发送 CreateOrderCommandCreateOrderCommand createCommand new CreateOrderCommand();createCommand.setOrderId(order.getId());createCommand.setProduct(order.getProduct());createCommand.setQuantity(order.getQuantity());String createCmd mapper.writeValueAsString(createCommand);kafkaTemplate.send(create-order-command, createCmd);}private void sendCancelOrderCommand(String orderId) throws JsonProcessingException {CancelOrderCommand cancelCommand new CancelOrderCommand();cancelCommand.setOrderId(orderId);String cancelMsg mapper.writeValueAsString(cancelCommand);kafkaTemplate.send(cancel-order-command, cancelMsg);}private double calculateAmount(String orderId) {// TODO: 实现金额计算逻辑return 100.0;}
}2.2.3 修改 OrderService
Service
public class OrderService {Autowiredprivate KafkaTemplateString, String kafkaTemplate;private ObjectMapper mapper new ObjectMapper();KafkaListener(topics create-order-command, groupId order-service-group)public void handleCreateOrder(String message) throws JsonProcessingException {CreateOrderCommand command mapper.readValue(message, CreateOrderCommand.class);// 创建订单逻辑// TODO: 保存订单到数据库// 发布 OrderCreatedEventOrderCreatedEvent event new OrderCreatedEvent();event.setOrderId(command.getOrderId());String eventMsg mapper.writeValueAsString(event);kafkaTemplate.send(order-created, eventMsg);}KafkaListener(topics cancel-order-command, groupId order-service-group)public void handleCancelOrder(String message) throws JsonProcessingException {CancelOrderCommand command mapper.readValue(message, CancelOrderCommand.class);// 取消订单逻辑cancelOrder(command.getOrderId());// 发布 OrderCancelledEventOrderCancelledEvent event new OrderCancelledEvent();event.setOrderId(command.getOrderId());String cancelledMsg mapper.writeValueAsString(event);kafkaTemplate.send(order-cancelled, cancelledMsg);}private void cancelOrder(String orderId) {// TODO: 实现取消订单逻辑}
}2.2.4 修改 PaymentService
Service
public class PaymentService {Autowiredprivate KafkaTemplateString, String kafkaTemplate;private ObjectMapper mapper new ObjectMapper();KafkaListener(topics payment-command, groupId payment-service-group)public void handlePaymentCommand(String message) throws JsonProcessingException {PaymentCommand command mapper.readValue(message, PaymentCommand.class);// 处理支付逻辑boolean approved processPayment(command.getOrderId(), command.getAmount());if (approved) {// 发布 PaymentApprovedEventPaymentApprovedEvent event new PaymentApprovedEvent();event.setOrderId(command.getOrderId());String approvedMsg mapper.writeValueAsString(event);kafkaTemplate.send(payment-approved, approvedMsg);} else {// 发布 PaymentRejectedEventPaymentRejectedEvent rejectedEvent new PaymentRejectedEvent();rejectedEvent.setOrderId(command.getOrderId());rejectedEvent.setReason(Payment was rejected.);String rejectedMsg mapper.writeValueAsString(rejectedEvent);kafkaTemplate.send(payment-rejected, rejectedMsg);}}private boolean processPayment(String orderId, double amount) {// TODO: 实现支付逻辑模拟支付失败return false;}
}2.2.5 实现 InventoryService
Service
public class InventoryService {KafkaListener(topics order-cancelled, groupId saga-orchestrator-group)public void handleOrderCancelled(String message) throws JsonProcessingException {OrderCancelledEvent cancelledEvent new ObjectMapper().readValue(message, OrderCancelledEvent.class);// 回滚库存逻辑rollbackInventory(cancelledEvent.getOrderId());}private void rollbackInventory(String orderId) {// TODO: 实现库存回滚逻辑}
}2.3 事务补偿机制
在 指挥模式 中Saga Orchestrator 作为中央控制器负责监控事务流程并在发生错误时触发补偿操作。例如当 PaymentService 处理支付失败时发布 PaymentRejectedEventSaga Orchestrator 监听到该事件后发送 CancelOrderCommand 给 OrderService 执行订单取消操作确保系统一致性。
总结
Saga 模式为分布式系统中的长事务管理提供了一种高效且灵活的解决方案。通过 编排 与 指挥 两种实现方式开发者可以根据具体业务需求和系统架构选择最合适的方式。
编排模式 适用于服务间关系较为松散、需要高扩展性的场景各服务通过事件进行独立协调但补偿逻辑较为分散。指挥模式 适用于需要集中控制事务流程、易于追踪和调试的场景Saga Orchestrator 作为中央管理者补偿逻辑统一管理但可能成为系统的瓶颈。
无论选择哪种模式事务补偿机制 都是确保系统可靠性和一致性的关键。通过合理设计补偿逻辑可以有效应对分布式环境下的各种故障和异常提升系统的健壮性。
借助 Spring Boot 和 Kafka 强大的生态和工具支持实现 Saga 模式变得更加便捷和高效。希望本文能够帮助你深入理解 Saga 模式并在实际项目中灵活应用打造稳定可靠的分布式系统。
参考资料
Saga Patterns: Managing Data Consistency in MicroservicesSpring Cloud Stream 与 Kafka 集成Axon Framework 官方文档
版权声明
本文为原创内容转载请注明出处。