深入分析消息丢失的所有场景、根因与解决方案
消息丢失发生在生产者、Broker、消费者三个环节,每个环节都有不同的触发条件
producer.send(msg) 异步发送,不等待 Broker 确认// ❌ 危险:异步发送,不处理结果
producer.send(new ProducerRecord<>("topic", key, value));
// 网络异常/leader切换时,消息可能静默丢失
// ✅ 同步发送,等待结果
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 必须处理异常:重试 or 记录到本地补偿表
log.error("发送失败", exception);
}
});
// 或同步等待
producer.send(record).get(); // 阻塞等待结果
acks=0:Producer 不等待 Broker 确认,网络丢包时消息直接丢失,应用无感知acks=1:仅等待 Leader 写入页面缓存(Page Cache)即返回成功,Leader 在 fsync 前宕机,数据丢失| acks 值 | 含义 | 丢失风险 | 适用场景 |
|---|---|---|---|
acks=0 | 不等待确认 | 极高 | 日志收集等可容忍丢失场景 |
acks=1 | Leader 写入即确认 | 中 | 一般业务,允许极低概率丢失 |
acks=all | 所有 ISR 副本确认 | 低 | 金融、订单等不允许丢失场景 |
Properties props = new Properties();
props.put("acks", "all"); // 所有 ISR 副本确认
props.put("retries", 3); // 发送失败重试次数
props.put("retry.backoff.ms", 100); // 重试间隔
max.request.size(默认 1MB),Broker 拒绝但 Producer 可能未正确处理错误recordTooLarge 错误拒绝,若未处理回调异常则静默丢失buffer.memory 默认 32MB,发送速率超过 Broker 处理能力时缓冲区满max.block.ms 超时后抛出 TimeoutException,若未捕获则消息丢失block.on.buffer.full=false(旧版本)直接丢弃新消息props.put("buffer.memory", 67108864); // 调大缓冲区 64MB
props.put("max.block.ms", 60000); // 阻塞等待最长 60s
// 同时在 send 回调中处理异常,写入本地磁盘或降级队列
acks=1,Producer 已收到成功确认,但数据未落盘# Broker 端关键配置 min.insync.replicas=2 # 写入时必须至少有 2 个 ISR 副本确认 unclean.leader.election.enable=false # 禁止非 ISR 副本成为 Leader # Producer 端配合 acks=all
# 禁止非 ISR 副本竞选 Leader(Kafka 0.11+ 默认值已是 false) unclean.leader.election.enable=false
replication.factor=1),Broker 磁盘损坏或机器下线,数据无法恢复# 创建 Topic 时指定副本数 bin/kafka-topics.sh --create \ --topic important-topic \ --partitions 6 \ --replication-factor 3 \ --config min.insync.replicas=2
log.retention.hours 默认 168 小时(7天),超过后消息被删除log.retention.bytes 达到阈值后,最旧段被删除log.retention.ms=-1 永久保留,但磁盘有风险enable.auto.commit=true(默认值),每隔 auto.commit.interval.ms(默认 5s)自动提交 offset// ❌ 危险:自动提交 + 异步处理
props.put("enable.auto.commit", "true"); // 默认就是 true
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
// 自动提交已发生,但下面业务逻辑可能失败
processRecords(records); // 若此处抛异常,消息已确认,永久丢失
}
// ✅ 关闭自动提交,业务处理完成后手动提交
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
try {
processRecords(records); // 业务处理
// 同步提交,确保提交成功
consumer.commitSync();
} catch (Exception e) {
// 处理失败,offset 不提交,消息会被重新消费
log.error("处理失败,等待重试", e);
}
}
commitSync(Map<TopicPartition, OffsetAndMetadata>) 按需提交,处理完一批后再提交对应 offsetsession.timeout.ms 默认 10s,若消费逻辑处理时间过长超过此值,Broker 认为 Consumer 死掉,触发 Rebalanceprops.put("session.timeout.ms", 30000); // 适当加大会话超时
props.put("max.poll.interval.ms", 300000); // 处理最大间隔,默认 5 分钟
props.put("enable.auto.commit", "false"); // 关闭自动提交
| 丢失环节 | 具体场景 | 丢失概率 | 检测难度 | 核心配置 |
|---|---|---|---|---|
| Producer | acks=0,不等待确认 | 极高 | 难(静默丢失) | acks=all |
| Producer | 异步发送未处理异常 | 高 | 中 | 处理 Callback 异常 |
| Broker | Leader 宕机,数据未复制 | 中 | 难 | min.insync.replicas≥2unclean.leader.election=false |
| Broker | 副本数=1,磁盘损坏 | 极高 | 易 | replication.factor≥3 |
| Consumer | 自动提交 + 消费失败 | 极高 | 中(有日志可查) | enable.auto.commit=false |
| Consumer | 多线程消费,提前提交 offset | 中 | 难 | 手动提交 + 幂等消费 |
acks=allretries=Integer.MAX_VALUEmax.in.flight.requests.per.connection=1(保证有序)
处理 send 回调异常,写入本地补偿表
replication.factor=3min.insync.replicas=2unclean.leader.election.enable=false
禁用磁盘缓存刷写(依赖副本机制)
enable.auto.commit=false
业务处理成功后手动提交
实现幂等消费(支持重复消费)isolation.level=read_committed(事务)
监控 UnderReplicatedPartitions
监控 OfflineLogDirectoryCount
监控 Consumer Lag 异常增长
消息轨迹追踪(注入 traceId)
enable.idempotence=true,避免因重试导致重复消息initTransactions/beginTransaction/commitTransaction),实现跨分区的原子写入traceId,Producer 记录发送日志,Consumer 记录消费日志kafka-consumer-groups.sh --bootstrap-server ... --describe 查看 Lag