分布式事务解决方案TCC实战
分布式事务解决方案TCC实战一、分布式事务概述在分布式系统中事务跨越多个服务或数据库传统的ACID事务无法直接适用需要采用分布式事务解决方案。1.1 分布式事务挑战挑战说明网络延迟跨服务调用存在网络延迟和超时数据一致性多个数据源需要保持一致故障恢复部分失败时需要回滚或补偿性能影响分布式锁和协调开销1.2 常见解决方案方案特点适用场景2PC强一致阻塞式数据一致性要求极高TCC最终一致非阻塞高并发场景Saga事件驱动长事务业务流程编排消息队列最终一致异步解耦场景二、TCC原理TCCTry-Confirm-Cancel是一种基于业务层面的分布式事务解决方案。2.1 TCC三阶段┌─────────────────────────────────────────────────────────────┐ │ 事务协调器 │ └─────────────────────────────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Try阶段 │ │ Confirm阶段 │ │ Cancel阶段 │ │ (预留资源) │ │ (确认提交) │ │ (取消释放) │ └─────────────────┘ └─────────────────┘ └─────────────────┘2.2 TCC状态机public enum TccStatus { TRYING(尝试中), CONFIRMING(确认中), CANCELING(取消中), SUCCESS(成功), FAILED(失败); }三、TCC实现3.1 TCC接口定义public interface TccTransaction { TccMethod(confirmMethod confirm, cancelMethod cancel) void tryExecute(TccContext context); void confirm(TccContext context); void cancel(TccContext context); }3.2 订单服务TCC实现Service public class OrderTccService implements TccTransaction { Autowired private OrderRepository orderRepository; Override public void tryExecute(TccContext context) { Order order Order.builder() .userId(context.getUserId()) .amount(context.getAmount()) .status(PENDING) .build(); orderRepository.save(order); context.setOrderId(order.getId()); context.setStatus(TccStatus.TRYING.name()); } Override public void confirm(TccContext context) { Order order orderRepository.findById(context.getOrderId()).orElse(null); if (order ! null PENDING.equals(order.getStatus())) { order.setStatus(SUCCESS); orderRepository.save(order); } context.setStatus(TccStatus.SUCCESS.name()); } Override public void cancel(TccContext context) { Order order orderRepository.findById(context.getOrderId()).orElse(null); if (order ! null PENDING.equals(order.getStatus())) { order.setStatus(CANCELLED); orderRepository.save(order); } context.setStatus(TccStatus.FAILED.name()); } }3.3 账户服务TCC实现Service public class AccountTccService implements TccTransaction { Autowired private AccountRepository accountRepository; Override public void tryExecute(TccContext context) { Account account accountRepository.findByUserId(context.getUserId()); if (account.getBalance() context.getAmount()) { throw new InsufficientBalanceException(余额不足); } account.setFreezeAmount(account.getFreezeAmount() context.getAmount()); account.setBalance(account.getBalance() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.TRYING.name()); } Override public void confirm(TccContext context) { Account account accountRepository.findByUserId(context.getUserId()); account.setFreezeAmount(account.getFreezeAmount() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.SUCCESS.name()); } Override public void cancel(TccContext context) { Account account accountRepository.findByUserId(context.getUserId()); account.setBalance(account.getBalance() context.getAmount()); account.setFreezeAmount(account.getFreezeAmount() - context.getAmount()); accountRepository.save(account); context.setStatus(TccStatus.FAILED.name()); } }四、事务协调器4.1 协调器设计Service public class TccCoordinator { Autowired private TransactionRepository transactionRepository; Autowired private ApplicationContext applicationContext; Transactional public String startTransaction(ListTccParticipant participants) { String transactionId UUID.randomUUID().toString(); Transaction transaction Transaction.builder() .transactionId(transactionId) .status(TccStatus.TRYING.name()) .participants(JsonUtil.toJson(participants)) .build(); transactionRepository.save(transaction); return transactionId; } public void commit(String transactionId) { Transaction transaction transactionRepository.findById(transactionId).orElse(null); if (transaction null) { return; } ListTccParticipant participants JsonUtil.fromJson( transaction.getParticipants(), new TypeReferenceListTccParticipant() {} ); for (TccParticipant participant : participants) { try { TccTransaction service applicationContext.getBean( participant.getServiceName(), TccTransaction.class ); TccContext context new TccContext(); context.setTransactionId(transactionId); context.setParams(participant.getParams()); service.confirm(context); } catch (Exception e) { rollback(transactionId); throw new TransactionCommitException(提交失败, e); } } transaction.setStatus(TccStatus.SUCCESS.name()); transactionRepository.save(transaction); } public void rollback(String transactionId) { Transaction transaction transactionRepository.findById(transactionId).orElse(null); if (transaction null) { return; } ListTccParticipant participants JsonUtil.fromJson( transaction.getParticipants(), new TypeReferenceListTccParticipant() {} ); for (TccParticipant participant : participants) { try { TccTransaction service applicationContext.getBean( participant.getServiceName(), TccTransaction.class ); TccContext context new TccContext(); context.setTransactionId(transactionId); context.setParams(participant.getParams()); service.cancel(context); } catch (Exception e) { // 记录日志后续人工处理 log.error(回滚失败: {}, e.getMessage()); } } transaction.setStatus(TccStatus.FAILED.name()); transactionRepository.save(transaction); } }4.2 TCC上下文public class TccContext { private String transactionId; private String status; private MapString, Object params new HashMap(); public void setParam(String key, Object value) { params.put(key, value); } public T T getParam(String key, ClassT clazz) { return clazz.cast(params.get(key)); } // getters and setters }五、异常处理与重试5.1 幂等性保证public class IdempotentUtil { private static final String PREFIX tcc:idempotent:; Autowired private StringRedisTemplate redisTemplate; public boolean checkAndLock(String key, long expireSeconds) { Boolean result redisTemplate.opsForValue().setIfAbsent( PREFIX key, locked, expireSeconds, TimeUnit.SECONDS ); return Boolean.TRUE.equals(result); } public void release(String key) { redisTemplate.delete(PREFIX key); } }5.2 重试机制Configuration public class RetryConfig { Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate new RetryTemplate(); FixedBackOffPolicy backOffPolicy new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(backOffPolicy); SimpleRetryPolicy retryPolicy new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; } }5.3 补偿任务Component public class CompensationTask { Autowired private TransactionRepository transactionRepository; Autowired private TccCoordinator coordinator; Scheduled(fixedDelay 60000) public void processPendingTransactions() { ListTransaction pendingTransactions transactionRepository.findByStatus( TccStatus.TRYING.name() ); for (Transaction transaction : pendingTransactions) { if (isTimeout(transaction.getCreateTime())) { coordinator.rollback(transaction.getTransactionId()); } } } private boolean isTimeout(Date createTime) { long diff System.currentTimeMillis() - createTime.getTime(); return diff 5 * 60 * 1000; // 5分钟超时 } }六、Seata TCC模式6.1 引入依赖dependency groupIdio.seata/groupId artifactIdseata-spring-boot-starter/artifactId version1.6.1/version /dependency6.2 配置文件seata: enabled: true application-id: order-service tx-service-group: my_tx_group config: type: nacos nacos: server-addr: localhost:8848 group: SEATA_GROUP namespace: registry: type: nacos nacos: server-addr: localhost:8848 group: SEATA_GROUP6.3 Seata TCC实现Service public class OrderTccServiceImpl { LocalTCC public void prepare(BusinessActionContext context, OrderDTO orderDTO) { Order order new Order(); order.setOrderId(orderDTO.getOrderId()); order.setStatus(PREPARE); orderRepository.save(order); context.setActionContext(orderId, orderDTO.getOrderId()); } public void commit(BusinessActionContext context) { String orderId context.getActionContext(orderId); Order order orderRepository.findByOrderId(orderId); order.setStatus(COMMITTED); orderRepository.save(order); } public void rollback(BusinessActionContext context) { String orderId context.getActionContext(orderId); Order order orderRepository.findByOrderId(orderId); order.setStatus(ROLLED_BACK); orderRepository.save(order); } }七、最佳实践7.1 设计原则幂等性Confirm和Cancel操作必须是幂等的可补偿性Cancel操作必须能正确回滚Try阶段的修改资源隔离Try阶段应锁定资源防止并发问题超时机制设置合理的超时时间自动触发Cancel7.2 注意事项事项说明网络分区考虑网络分区情况下的一致性保障数据库事务Try阶段应使用数据库事务保证原子性日志记录记录详细的事务日志便于排查问题监控告警监控事务成功率、失败率、超时率7.3 性能优化// 使用批量操作减少数据库访问 Transactional public void batchConfirm(ListString transactionIds) { ListTransaction transactions transactionRepository.findAllById(transactionIds); for (Transaction transaction : transactions) { // 批量更新状态 transaction.setStatus(TccStatus.SUCCESS.name()); } transactionRepository.saveAll(transactions); }八、总结TCC是一种灵活的分布式事务解决方案适用于高并发场景。通过合理设计Try、Confirm、Cancel三个阶段可以在保证最终一致性的同时获得较好的性能表现。关键要点TCC是业务层面的分布式事务解决方案需要保证Confirm和Cancel的幂等性需要实现超时和重试机制可以借助Seata等框架简化实现在实际应用中应根据业务场景选择合适的分布式事务方案平衡一致性、性能和复杂度。