消息可靠性、顺序性、去重、积压、丢失 —— 五大核心问题的原理、场景与解决方案
⚡ 生产环境必备知识核心问题:Kafka 如何保证消息不丢、不重,即 精确一次语义(EOS)?这是生产环境最关键的问题。
| 层级 | 含义 | 配置要点 |
|---|---|---|
| At Most Once | 最多消费一次,可能丢消息 | enable.auto.commit = true,自动提交 offset |
| At Least Once | 至少消费一次,不丢但可能重复 | 手动提交 offset + 幂等消费(主流方案) |
| Exactly Once | 精确一次,不丢不重 | Kafka 0.11+ 幂等生产者 + 事务 API |
# 关键配置
acks = all # (或 -1) ISR 中所有副本都确认才算成功
retries = Integer.MAX_VALUE # 无限重试
max.in.flight.requests.per.connection = 1 # 顺序发送
enable.idempotence = true # 开启幂等性(Kafka 0.11+)
# Broker 配置
replication.factor = 3 # 分区副本数 ≥ 3
min.insync.replicas = 2 # 最少同步副本数(必须 > 1)
unclean.leader.election.enable = false # 不允许非 ISR 节点当选 Leader
ISR(In-Sync Replicas)是跟 Leader 保持同步的副本集合。min.insync.replicas = 2 保证至少 2 个副本写成功,这样即使挂 1 个也不影响。
# Consumer 最佳实践
enable.auto.commit = false # 关闭自动提交!手动控制
auto.offset.reset = earliest # 新消费者从头开始
# 正确的消费流程:
# 1. 拉取消息
# 2. 业务处理(写入 DB / 调用下游)
# 3. 处理成功 → 手动 commit offset
# 4. 处理失败 → 重试 或 记录死信队列
💡 黄金法则:先处理业务,处理成功后再提交 offset。反过来做会导致消息丢了都不知道。
Kafka 0.11+ 支持跨分区的事务写入,配合幂等生产者实现真正的 EOS:
// Producer 初始化事务
props.put("transactional.id", "my-tx-id");
producer.initTransactions();
try {
producer.beginTransaction();
// 向多个 topic/partition 发送消息
producer.send(new ProducerRecord<>("topic-a", key, value));
producer.send(new ProducerRecord<>("topic-b", key, value));
producer.commitTransaction(); // 全部原子提交
} catch (Exception e) {
producer.abortTransaction(); // 全部回滚
}
核心问题:Kafka 如何保证消息的消费顺序?为什么默认情况下消息可能是乱序的?
Producer ──┬──> Partition 0: msg1 → msg3 → msg5 → msg7 (各自有序)
├──> Partition 1: msg2 → msg4 → msg6 → msg8
└──> Partition 2: ...
Consumer Group
├── C0 消费 P0: msg1, msg3, msg5...
├── C1 消费 P1: msg2, msg4, msg6... ← 各自并行,无法保证全局顺序!
└── C2 消费 P2: ...
期望输出: msg1, msg2, msg3, msg4 ...
实际可能: msg1, msg3, msg2, msg5 ... ← ❌ 乱序!
| 方案 | 做法 | 优点 | 缺点 |
|---|---|---|---|
| 方案一:单分区 | Topic 只设 1 个 Partition | 天然全局有序,简单粗暴 | 丧失并发度,吞吐量低 |
| 方案二:Key 分区 ⭐推荐 | 相同 Key 的消息路由到同一 Partition | 兼顾有序 + 并发 | 需要设计合理的 Key |
| 方案三:消费端排序 | Consumer 内存中缓存 + 排序 | 不影响生产端 | 内存压力大,延迟高 |
// 方案二示例:按订单 ID 分区,保证同一订单的消息有序
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", orderId.toString(), orderJson);
// 相同 orderId → 相同 Partition → 有序 ✓
⚠️ 注意:如果业务要求严格的全局顺序(如金融交易流水),只能用单分区或引入外部协调服务(如 ZooKeeper 分布式锁),代价较大。
核心问题:什么情况下会产生重复消息?如何做到幂等消费?
| 场景 | 原因 | 发生位置 |
|---|---|---|
| 生产端重试 | 网络抖动导致 ACK 超时,Producer 重新发送,Broker 实际已收到 | Producer → Broker |
| Broker 故障切换 | Leader 切换后,新 Leader 可能有尚未提交的消息被再次写入 | Broker 内部 |
| 消费端 Rebalance | Consumer 提交 offset 后但在持久化前宕机,Rebalance 后重复拉取已消费的消息 | Consumer 端(最常见!) |
定义:对同一操作执行一次和执行 N 次,结果相同。这是解决重复的终极武器。
-- 方案:数据库唯一约束
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY, -- 消息中的业务ID作为唯一键
amount DECIMAL(10,2),
status VARCHAR(20),
create_time DATETIME,
UNIQUE KEY uk_msg_id (message_id) -- 或者用消息ID做唯一键
);
-- 消费端伪代码
void consume(OrderMessage msg) {
try {
db.insert("INSERT IGNORE INTO orders VALUES (?, ?, ?, ?, ?)",
msg.orderId, msg.amount, msg.status, now(), msg.messageId);
// INSERT IGNORE:如果存在则跳过(幂等!)
commitOffset();
} catch (Exception e) {
sendToDLQ(msg); // 死信队列
}
}
💡 最佳实践:在消息体中携带全局唯一 ID(如 UUID、雪花算法生成的 ID),消费端基于此 ID 做去重判断。
核心问题:Consumer 消费速度跟不上 Producer 发送速度,导致消息大量堆积怎么办?
| 原因类型 | 具体表现 |
|---|---|
| 下游瓶颈 | DB 写入慢、外部 API 调用超时、第三方系统限流 |
| Consumer 性能不足 | 单线程处理太慢、GC 频繁 Full GC、CPU 打满 |
| 突发流量 | 大促/活动期间消息量突增数倍到数十倍 |
| Rebalance 频繁 | 频繁 Rebalance 导致消费暂停,积压加剧 |
Step 1: 发现积压(监控告警)
├── Lag 监控:consumer lag > 阈值告警
├── 磁盘监控:Broker 磁盘使用率 > 80%
└── 日志监控:日志出现 "lag increasing"
Step 2: 紧急扩容(临时方案)
├── 将当前 Consumer 停掉(保留 Offset)
├── 新建一个 Topic(partitions = 原来的 N 倍,比如 10 倍)
├── 编写临时消费程序:
│ ├── 只做转发:从原 Topic 快速读 → 批量写到新 Topic
│ ├── 不做任何业务处理!(最大化速度)
│ └── 可以启动 M 个 Consumer 同时跑
├── 启动 N 倍于原来的正式 Consumer 消费新 Topic
└── 积压消化完后,恢复原有架构
Step 3: 根因修复(长期方案)
├── 优化 Consumer 处理逻辑(异步、批处理)
├── 下游扩容(DB 连接池、缓存加速)
└── 上游限流(Producer 速率控制)
正常架构:
Producer → [Topic-A] → Consumer(3个) → 业务DB
积压应急架构:
Producer → [Topic-A] → 临时转发程序(10个) → [Topic-B(10x分区)] → Consumer(30个) → 业务DB
↑ 只转发不处理 ↑ 扩容消费 ↑ 恢复正常速度
核心问题:消息在哪些环节可能丢失?如何全方位防止消息丢失?
每个环节都可能丢消息 ↓
| 环节 | 丢失场景 | 根因 | 解决方案 |
|---|---|---|---|
| Producer 端 | 发送失败未处理 | 网络异常 / Broker 不可用 | retries=Integer.MAX_VALUEacks=all |
| Broker 端 | Page Cache 未刷盘就宕机 | 操作系统崩溃 / 断电 | log.flush.interval.messageslog.flush.interval.ms(代价:性能下降) |
| Broker 端 | 副本不足导致数据丢失 | ISR 中副本数不够 | replication.factor≥3min.insync.replicas>1unclean.leader.election.enable=false |
| Consumer 端 | 自动提交 Offset 后处理失败 | 业务异常但 Offset 已推进 | enable.auto.commit=false先处理业务再手动提交 Offset |
# ========== Producer ==========
bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
acks = all # 必须所有 ISR 副本确认
retries = 2147483647 # 无限重试
max.in.flight.requests.per.connection = 5 # 配合 enable.idempotence 使用
enable.idempotence = true # 开启幂等(防重复+防乱序)
# delivery.timeout.ms = 120000 # 总超时时间
# retry.backoff.ms = 100 # 重试间隔
# ========== Broker(server.properties)==========
# --- 副本相关 ---
default.replication.factor = 3 # 默认副本数 = 3
min.insync.replicas = 2 # 至少 2 个副本同步
unclean.leader.election.enable = false # 禁止脏选举
# --- 刷盘相关 ---
# log.flush.interval.messages = 10000 # 每 1 万条刷盘(按需调整)
# log.flush.interval.ms = 1000 # 每 1 秒刷盘(按需调整)
# 注意:默认依赖 OS 异步刷盘,性能更好;强一致性需开启主动刷盘
# --- ISR 相关 ---
replica.lag.time.max.ms = 30000 # Follower 超过 30s 不同步则踢出 ISR
num.recovery.threads.per.data.dir = 3
# ========== Consumer ==========
enable.auto.commit = false # 【关键】关闭自动提交
auto.offset.reset = earliest # 新 consumer 从头开始
# max.poll.records = 500 # 每次最多拉取 500 条
# 手动控制:poll() → process() → 成功则 commitSync()
⚠️ 性能 vs 可靠性权衡:开启 log.flush 会显著降低吞吐量。一般建议依赖多副本机制防丢失(性能损耗小),而不是频繁刷盘(性能损耗大)。除非你的数据价值极高(如资金交易)。
acks=all
+ 幂等生产者
+ 事务 API
Key 分区路由
+ 单线程消费
同 Partition 有序
幂等消费
+ 唯一约束
+ 去重表/Redis
临时扩容
+ 转发程序
+ 批量消费
多副本机制
+ 手动 Commit
+ 合理 acks
可靠性靠配置和副本, 顺序性靠分区策略, 去重靠幂等设计, 积压靠扩容和转发, 防丢失靠全链路把控。