⚡ 分布式系统核心概念

分布式事务

当一笔业务跨越多个数据库或服务,如何保证数据的一致性?从理论到实战,彻底搞懂。

📦 分布式事务是什么

先理解单机事务,再理解为什么分布式让事务变得极度复杂。

💡
单机事务(本地事务):一组操作要么全部成功,要么全部回滚。 MySQL 的 BEGIN / COMMIT / ROLLBACK 就是最典型的例子。 单进程内,数据库引擎可以通过 undo log、redo log、锁机制保证 ACID。
⚠️
分布式事务:同一个业务操作需要跨越 多个数据库多个微服务, 且这些操作必须作为一个原子单元整体成功或整体回滚。 由于网络延迟、节点故障、时钟不同步等问题,ACID 无法直接在分布式场景中使用。

一个典型场景:电商下单

订单服务
创建订单记录
order_db
库存服务
扣减商品库存
inventory_db
积分服务
增加用户积分
points_db
支付服务
扣减账户余额
payment_db
如果订单创建成功、库存扣减成功,但 支付服务宕机, 用户没有付钱但订单已生成、库存已扣——这就是分布式场景下的数据不一致。

为什么分布式事务这么难

三个根本原因,注定了分布式事务不能用单机方式解决。

🌐 网络不可靠
网络请求可能超时、丢包、重复到达。你永远不知道对方是"还没收到"还是"收到了但响应丢了"。
💀 节点可能崩溃
任意一个参与者可能在事务执行到一半时宕机,留下一个无法确定状态的系统。
⏱ 没有全局时钟
分布式系统中不存在统一的物理时钟,无法用时间戳来确定操作的全局顺序。

核心困境:两将军问题

🏰
两将军问题:两支军队通过不可靠的信使传递消息,无法保证双方同时发动进攻。 类比到分布式事务:协调者和参与者无法通过有限次不可靠通信来确保双方的状态完全一致。 这是 理论上的不可能证明——分布式一致性只能"尽量保证",不能"完全保证"。

🔺 CAP 理论 & BASE 原则

理解分布式系统的基础理论,是选型的出发点。

C — Consistency(一致性)
每次读取都能看到最新写入的数据,所有节点同时看到相同的数据。
A — Availability(可用性)
每个请求都能在有限时间内得到响应(不保证是最新数据),系统不能拒绝服务。
P — Partition Tolerance(分区容错性)
网络发生分区(部分节点间无法通信)时,系统仍然能运行。
CAP 定理:三者不可同时满足,最多只能满足其中两项。 而在真实分布式系统中,P(分区容错)是必须的(网络分区随时可能发生), 所以实际上是在 C 和 A 之间做取舍
选型含义典型系统适用场景
CP 保一致性,牺牲可用性。网络分区时拒绝请求直到恢复一致。 ZooKeeper、etcd、HBase 金融交易、配置中心
AP 保可用性,牺牲强一致性。网络分区时允许读到旧数据,最终一致。 Cassandra、DynamoDB、CouchDB 购物车、日志、评论
CA 保一致性+可用性,但无法容忍网络分区——只在单机或同机房有意义。 单机 MySQL、PostgreSQL 单体应用、强隔离场景
📖
BASE 是对 ACID 的妥协,适用于大规模分布式系统的设计哲学。
BA — Basically Available(基本可用)
允许系统在故障时损失部分可用性(如响应延迟增加、非核心功能降级),但核心功能仍可用。
S — Soft State(软状态)
允许系统中的数据存在中间状态,即使没有外部输入,状态也可能变化(如同步中)。
E — Eventually Consistent(最终一致)
系统在一段时间后(没有新写入时),所有副本数据最终会达到一致状态。

🛠 五大解决方案

没有银弹,每种方案都有适合的场景与明确的代价。

📋
Two-Phase Commit(两阶段提交):最经典的分布式事务协议,分为"投票阶段"和"提交阶段"。

流程

阶段一:Prepare(准备/投票)
协调者向所有参与者发送 Prepare 请求。每个参与者执行事务操作但 不提交,将操作写入 undo log / redo log,然后返回 Yes/No。
阶段二a:Commit(所有人都说 Yes)
协调者收到所有 Yes 后,向所有参与者发送 Commit 命令,各参与者正式提交事务并释放锁。
阶段二b:Rollback(任何人说 No)
只要有一个参与者返回 No 或超时,协调者向所有参与者发送 Rollback 命令,各参与者回滚事务。

优缺点

优点缺点
强一致性 实现相对简单,标准化程度高(XA 协议) 单点故障 协调者宕机则所有参与者阻塞,锁不释放
数据库原生支持(MySQL XA、PostgreSQL) 性能差 两次网络往返,整个阶段加锁,并发度极低
应用无感知 数据不一致风险 阶段二协调者崩溃,部分节点提交、部分未提交
📋
Three-Phase Commit(三阶段提交):2PC 的改进版,增加 CanCommit 预询问阶段,并引入超时机制减少阻塞。
阶段一:CanCommit
协调者先询问"你能提交吗?",参与者不做任何操作,只是根据自身状态回复 Yes/No。代价极低。
阶段二:PreCommit
所有人同意后,执行事务操作并写入日志,但不正式提交。各参与者设置超时:若超时未收到阶段三指令,自动提交(默认乐观)。
阶段三:DoCommit
正式提交或回滚。参与者有超时自动提交机制,降低阻塞概率。
⚠️
3PC 解决了协调者宕机时参与者无限阻塞的问题,但引入了新的不一致风险:若网络分区后超时自动提交,协调者此前发出的是 Abort 但部分节点已自动 Commit,仍可能导致不一致。工程实践中 3PC 几乎不用,Saga / TCC 已取代它。
📋
TCC(Try-Confirm-Cancel):业务侵入型方案,开发者为每个操作实现三个接口,从业务层面保证事务。
Try(预留资源)
锁定/冻结资源。如:冻结 100 元余额(不扣除),预占库存(不减少),返回 OK/Failed。
Confirm(确认提交)
所有 Try 成功后,真正执行业务。如:实际扣除余额,实际减少库存。幂等
Cancel(取消回滚)
任意 Try 失败后,释放已预留的资源。如:解冻余额,释放库存。幂等

TCC 与 2PC 对比

维度2PC (XA)TCC
实现层数据库层,应用无感知业务层,需手写三套代码
锁持有时间整个两阶段,很长只在 Try 阶段锁定,短
性能
一致性级别强一致最终一致(补偿式)
开发成本高(业务入侵)
适用低并发、强一致场景高并发、金融类场景
⚠️
TCC 的难点:空回滚(Try 未执行就收到 Cancel)、 幂等(Confirm/Cancel 重复调用)、 悬挂(Cancel 先于 Try 到达)。 推荐使用 Seata TCC 框架来自动处理这些问题。
📋
Saga 模式:将分布式事务拆成一系列本地事务,每个本地事务成功后触发下一个;某步失败时,逆向执行补偿事务。

两种实现方式

Choreography(事件驱动)
去中心化,服务间通过消息/事件协调
无中心协调者
每个服务监听事件并触发下一步
适合步骤少的简单流程
难以追踪全局状态
Orchestration(编排式)
有中心 Saga 协调器统一调度
有中心 Saga Orchestrator
协调器控制每步调用与补偿
适合复杂多步业务流程
全局状态清晰,易于监控

补偿事务示例(下单失败回滚)

T1: 创建订单
✓ 成功
T2: 扣库存
✓ 成功
T3: 扣款
✗ 失败
C2: 恢复库存
补偿
C1: 取消订单
补偿
Saga 是微服务场景下最推荐的方案之一:无全局锁、高可用、适合长事务。缺点是只能实现最终一致性,补偿操作需要精心设计,不适合需要强一致的场景(如转账)。
📋
本地消息表(Transactional Outbox 模式):利用本地数据库事务保证"消息写入"与"业务操作"的原子性,再异步投递消息给下游,实现最终一致。
业务操作 + 写消息表(同一本地事务)
在一个本地事务中同时写业务数据和消息表。保证"两件事要么都成功要么都失败"。
消息投递服务轮询消息表
独立的 Worker 不断扫描消息表,将未发送的消息发到 MQ(Kafka/RocketMQ)。
下游消费者处理消息(幂等)
消费者收到消息后执行对应操作。需要保证幂等(消息可能重复投递)。
标记消息为已消费
消费成功后,更新消息表状态为 CONSUMED 或直接删除记录。
本地消息表是工程实践中最常用、最稳健的方案之一。优点:实现简单、无框架依赖、可靠性高。缺点:有一定延迟(最终一致),下游需保证幂等。

💻 实战代码示例

三个最常用方案的核心代码片段,直接可用于真实项目。

// Java + Seata TCC 示例:支付服务
@LocalTCC
public interface PaymentTccService {

    // Try: 冻结余额(不实际扣款)
    @TwoPhaseBusinessAction(
        name = "freezeAmount",
        commitMethod = "confirm",
        rollbackMethod = "cancel"
    )
    boolean tryFreeze(BusinessActionContext ctx,
        @BusinessActionContextParameter("userId") Long userId,
        @BusinessActionContextParameter("amount") BigDecimal amount);

    // Confirm: 真正扣款(幂等)
    boolean confirm(BusinessActionContext ctx);

    // Cancel: 解冻余额(幂等)
    boolean cancel(BusinessActionContext ctx);
}

// 实现类
@Service
public class PaymentTccServiceImpl implements PaymentTccService {

    @Override
    public boolean tryFreeze(BusinessActionContext ctx, Long userId, BigDecimal amount) {
        // 1. 幂等检查:避免重复 Try
        if (tccRecordExists(ctx.getXid())) return true;
        // 2. 检查余额是否充足
        Account account = accountRepo.findByUserId(userId);
        if (account.getBalance().compareTo(amount) < 0) return false;
        // 3. 冻结余额(可用余额减少,冻结金额增加)
        account.freeze(amount);
        accountRepo.save(account);
        // 4. 记录 TCC 操作流水
        tccRecordRepo.insert(ctx.getXid(), "TRY", userId, amount);
        return true;
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        TccRecord record = tccRecordRepo.findByXid(ctx.getXid());
        if (record == null || record.isConfirmed()) return true; // 幂等
        // 从冻结金额转为真实扣减
        accountRepo.confirmFreeze(record.getUserId(), record.getAmount());
        record.setStatus("CONFIRMED");
        tccRecordRepo.save(record);
        return true;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        TccRecord record = tccRecordRepo.findByXid(ctx.getXid());
        if (record == null) return true; // 空回滚:Try 未执行则直接返回
        if (record.isCancelled()) return true; // 幂等
        // 释放冻结金额
        accountRepo.unfreeze(record.getUserId(), record.getAmount());
        record.setStatus("CANCELLED");
        tccRecordRepo.save(record);
        return true;
    }
}
// Python + 伪代码展示 Saga Orchestration 补偿逻辑

from dataclasses import dataclass
from typing import List, Callable
import logging

@dataclass
class SagaStep:
    name: str
    action: Callable       # 正向操作
    compensate: Callable  # 补偿操作

class SagaOrchestrator:
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.executed: List[SagaStep] = []

    def execute(self, context: dict) -> bool:
        for step in self.steps:
            try:
                logging.info(f"执行步骤: {step.name}")
                step.action(context)
                self.executed.append(step)
            except Exception as e:
                logging.error(f"步骤 {step.name} 失败: {e},开始补偿")
                self._compensate(context)
                return False
        return True

    def _compensate(self, context: dict):
        # 逆序执行已成功步骤的补偿操作
        for step in reversed(self.executed):
            try:
                logging.info(f"补偿步骤: {step.name}")
                step.compensate(context)
            except Exception as e:
                # 补偿失败需人工介入或进入死信队列
                logging.critical(f"补偿失败,需人工处理: {step.name}, {e}")

# 使用示例:电商下单 Saga
def create_order_saga(order_data: dict):
    saga = SagaOrchestrator([
        SagaStep(
            name="创建订单",
            action=lambda ctx: order_service.create(ctx),
            compensate=lambda ctx: order_service.cancel(ctx["order_id"])
        ),
        SagaStep(
            name="扣减库存",
            action=lambda ctx: inventory_service.deduct(ctx),
            compensate=lambda ctx: inventory_service.restore(ctx["sku_id"], ctx["qty"])
        ),
        SagaStep(
            name="扣款支付",
            action=lambda ctx: payment_service.charge(ctx),
            compensate=lambda ctx: payment_service.refund(ctx["payment_id"])
        ),
    ])
    return saga.execute(order_data)
-- 1. 建表:outbox 消息表
CREATE TABLE outbox_messages (
    id          BIGINT       PRIMARY KEY AUTO_INCREMENT,
    aggregate_id VARCHAR(64) NOT NULL,  -- 业务主键,如 order_id
    event_type   VARCHAR(128) NOT NULL, -- 事件类型,如 ORDER_CREATED
    payload      JSON        NOT NULL,  -- 事件内容
    status       VARCHAR(16) DEFAULT 'PENDING',  -- PENDING/PUBLISHED/FAILED
    created_at   DATETIME    DEFAULT NOW(),
    published_at DATETIME,
    retry_count  INT         DEFAULT 0
);

-- 2. 业务操作 + 写消息表,同一本地事务
BEGIN;
  -- 业务操作
  INSERT INTO orders (id, user_id, amount, status)
  VALUES (1001, 42, 299.00, 'CREATED');

  -- 同事务写 outbox
  INSERT INTO outbox_messages (aggregate_id, event_type, payload)
  VALUES ('1001', 'ORDER_CREATED',
    '{"orderId":1001,"userId":42,"amount":299.00}');
COMMIT;

-- 3. Outbox Worker (Python 轮询投递)
--    定时执行,发现 PENDING 消息则投递到 Kafka
# Python: Outbox Worker
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def outbox_worker():
    while True:
        # 批量取 PENDING 消息(加 FOR UPDATE 防并发)
        msgs = db.query("""
            SELECT * FROM outbox_messages
            WHERE status = 'PENDING' AND retry_count < 5
            ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED
        """)
        for msg in msgs:
            try:
                producer.send(
                    topic=msg['event_type'],
                    key=msg['aggregate_id'].encode(),
                    value=msg['payload'].encode()
                )
                db.execute("""
                    UPDATE outbox_messages
                    SET status='PUBLISHED', published_at=NOW()
                    WHERE id=?
                """, [msg['id']])
            except Exception as e:
                db.execute("""
                    UPDATE outbox_messages
                    SET retry_count=retry_count+1,
                        status=IF(retry_count>=4,'FAILED','PENDING')
                    WHERE id=?
                """, [msg['id']])
        time.sleep(1)

# 消费者必须保证幂等(检查是否已处理过该 aggregate_id)

🎯 如何选型

没有最好的方案,只有最适合的方案。按需求维度做决策。

方案 一致性 性能 开发成本 适用场景 不适用场景
2PC / XA 强一致 差 ⭐ 低并发、数据库跨库、简单场景 高并发、微服务间调用
TCC 最终一致 好 ⭐⭐⭐⭐ 高(3套接口) 金融支付、高并发、强控制 业务改造成本不可接受时
Saga 最终一致 好 ⭐⭐⭐⭐ 中(补偿逻辑) 微服务长事务、复杂业务流程 需要强一致(如转账)
本地消息表 最终一致 好 ⭐⭐⭐⭐ 低~中 跨服务异步通知、解耦场景 对延迟零容忍的同步场景
Seata AT 最终一致 中 ⭐⭐⭐ 极低(无侵入) Java Spring Cloud 微服务、快速落地 非Java技术栈、超高并发

场景决策树

是否需要强一致(实时)?
→ 使用 2PC / XA(接受性能代价),或重新设计业务避开分布式事务
否(最终一致可接受) → 继续 Q2
是否是高并发 + 金融级精确控制?
→ 使用 TCC(接受高开发成本)
→ 继续 Q3
是否是复杂的多步业务流程(微服务链路长)?
→ 使用 Saga(Orchestration 模式)
否(简单的跨服务通知) → 使用 本地消息表 + MQ
是否在 Java Spring Cloud 生态,且想快速落地?
→ 直接使用 Seata AT 模式(0 代码改造,接入简单)
→ 根据上面选择并自行实现或用开源框架

🪤 实战踩坑与最佳实践

分布式事务在生产中的高频问题与对应解法。

🔁
幂等性设计(必须)
消息重复投递、Confirm/Cancel 重复调用都会发生
所有补偿接口、消费者接口必须幂等。通用做法:在数据库建唯一索引(event_id 或 xid),操作前检查是否已处理。 不要依赖业务逻辑判断幂等,要用数据库唯一约束兜底。
💀
补偿失败怎么办?
Saga 补偿本身也可能失败
补偿失败时不能无限重试。标准做法:重试 N 次后放入死信队列(DLQ), 触发告警,人工介入处理。系统需要对账机制定期核查分布式事务完成状态。
🕐
超时的处理
下游超时不代表失败,可能是成功但响应超时
请求超时后不能简单重试(可能重复扣款)。正确做法:超时后先查询状态, 确认成功/失败后再决定是重试还是补偿。所有关键接口需要提供查询接口。
🔍
可观测性
分布式事务最难的是排查问题
每个分布式事务需要有唯一 XID(全局事务ID), 串联所有参与者的日志、数据库记录和消息。配合 Jaeger/Zipkin 做全链路追踪,配合 Prometheus/Grafana 监控事务成功率和延迟。
🚫
尽量避免分布式事务
最好的方案是不需要分布式事务
合并服务:将需要强一致的数据放在同一数据库,用本地事务。
接受最终一致:很多业务其实不需要强一致(如积分),用异步事件即可。
柔性事务设计:允许临时不一致,通过对账+补偿实现最终一致。