流式输出的断线续传机制

深入解析「APP 退出后重新进入,输出还能自动继续」背后的技术原理 —— 以豆包为例

0 先看现象

你一定是经历过这个场景才开始好奇的:

🤖 提问 📝 开始流式输出... 👆 退出 APP ⏱️ 过了一会 📱 重新打开 APP ✨ 继续输出!

这看起来像魔法,实际上是精心设计的「服务端缓冲 + 客户端续传」机制。

1 先理解基础:SSE 是怎么做流式输出的

要理解断线续传,必须先理解正常流式是怎么跑的。

1.1 SSE (Server-Sent Events) 协议

SSE 是 HTTP 长连接协议:客户端发一个 GET/POST 请求,服务端持续推送 text/event-stream 格式的数据,直到结束或断开。

// 客户端发起 POST 请求
POST /v1/chat/completions
Content-Type: application/json
Accept: text/event-stream

{ "stream": true, "messages": [...] }

// 服务端持续推送 SSE 事件(逐 token)
data: {"id":"req_abc", "delta":{"content":"春"}}

data: {"id":"req_abc", "delta":{"content":"眠"}}

data: {"id":"req_abc", "delta":{"content":"不"}}

data: {"id":"req_abc", "delta":{"content":"觉"}}

data: {"id":"req_abc", "delta":{"content":"晓"}}

data: [DONE]

1.2 关键概念:请求级有状态

一次 SSE 连接对应 一个 LLM 推理会话。服务端在内存中保存了:

  • 当前已生成的 token 列表
  • KV Cache(注意力机制的中间状态,加速后续生成)
  • 用户原始 messages 上下文

这个会话状态只在连接存活期间有效。连接一断,服务端通常会立即清理。

核心矛盾 正常 SSE 设计:断开 = 丢失 = 无法恢复。豆包的退出重进续传,本质上是在连接断开时主动持久化了会话状态

2 问题拆解:要实现断线续传,必须解决什么?

把「退出再进继续输出」拆成三个子问题:

问题 ①:断开时,推理不能中断

APP 退出时 TCP 连接断开,但服务端的 LLM 推理进程必须继续跑,不能因为客户端走了就停。

问题 ②:已生成的内容不能丢

APP 退出前已经收到并显示了部分内容(比如 "春眠不觉"),重新打开时必须把这些内容原样恢复。

问题 ③:重新连接后,从断点继续推

APP 重新打开后,发起一个新请求,服务端识别出「这是之前那个会话的续传」,把断开期间生成的新内容一次性补上,然后继续流式推送。

3 核心方案:会话级消息缓冲 + 断点续传

这是业界最主流的实现路径,豆包、ChatGPT、Kimi 本质上都是这个方案。

3.1 架构总览

📱 APP 客户端 API 网关 会话管理器 SessionId → 状态 Redis 消息队列 LLM 推理引擎 独立进程 一直运行 DB 持久存储 HTTP 长轮询 / WebSocket 📱 重新连接 续传请求 POST 请求 创建会话 写 token 推送 token 持久化完成 关联会话ID 续传请求 续传

LLM 推理引擎和消息推送解耦 —— 推理不断,消息堆积在 Redis,客户端随时来取

4 详细实现:一步一步拆解

下面按时间线完整走一遍这个流程:

4.1 第一步:建立会话

用户发起提问时,服务端不仅要启动 LLM 推理,还要创建一个持久化会话

// 服务端处理用户提问
func HandleChat(userId, conversationId, messages) {

    // 1. 生成唯一的会话 ID
    sessionId := UUID()

    // 2. 在 Redis 中创建会话状态
    redis.HSet("session:"+sessionId, {
        "status":         "generating",      // 状态:生成中
        "conversation_id": conversationId,    // 关联的对话 ID
        "user_id":         userId,              // 用户 ID
        "messages":         json(messages),     // 原始请求 messages
        "created_at":       now(),
        "total_tokens":     0,
        "last_seq":         0,                  // 最后一条消息序号
    })

    // 3. 创建一个 Redis List 用于堆积生成的 token
    //    key: session:{id}:tokens

    // 4. 异步启动 LLM 推理(不阻塞 HTTP 响应)
    go startLLMInference(sessionId, messages)

    // 5. 立即返回 sessionId 给客户端
    return { "session_id": sessionId }
}

4.2 第二步:推理进程写 Token 到 Redis 队列

LLM 推理在独立 goroutine / 进程中进行,每生成一个 token 就写入 Redis:

func startLLMInference(sessionId, messages) {

    stream := llm.ChatCompletionStream(messages)

    seq := 0

    for chunk := range stream {

        seq++

        // 每个 token 封装成带序号的消息
        tokenMsg := json({
            "seq":     seq,                    // 消息序号,用于断点续传定位
            "content": chunk.Delta.Content,   // token 文本内容
            "ts":      now(),                 // 时间戳
        })

        // 写入 Redis List — 尾部追加
        redis.RPush("session:"+sessionId+":tokens", tokenMsg)

        // 更新最后序号
        redis.HSet("session:"+sessionId, "last_seq", seq)

        // 发布到 Redis Pub/Sub,通知推送层有新 token
        redis.Publish("channel:session:"+sessionId, tokenMsg)
    }

    // 推理完成,标记会话状态
    redis.HSet("session:"+sessionId, "status", "completed")
    redis.Publish("channel:session:"+sessionId, "__DONE__")
}

4.3 第三步:实时推送(正常流程)

客户端正常在线时,通过 SSE / WebSocket 订阅 Redis Pub/Sub,实时收到每个 token:

// 推送层 —— 订阅 Redis 频道,转发给客户端
func streamTokens(sessionId) {
    // 建立 SSE 连接,设置 Content-Type: text/event-stream

    pubsub := redis.Subscribe("channel:session:"+sessionId)

    for msg := range pubsub.Channel() {
        if msg.Payload == "__DONE__" {
            writeSSE("data: [DONE]\n\n")
            break
        }
        writeSSE("data: " + msg.Payload + "\n\n")
        flush()
    }
}

5 核心!断线续传的实现(关键)

这是回答你问题的核心部分。当 APP 退出再进来,发生了两件事:

5.1 客户端退出时 —— 什么都不会发生

客户端断开 SSE 连接 → 推送层感知到连接断开 → 但不做任何事

因为 LLM 推理进程 完全独立,它不知道也不需要知道客户端是否在线。它只是继续往 Redis 队列里写 token。

这是整个设计最巧妙的地方:生产者和消费者完全解耦

5.2 客户端重新进入时 —— 续传

APP 重新打开后,发现自己所在的对话中有一个 session_id 且状态为 "generating"(说明服务端还在推理),于是发起续传:

// 客户端重新进入对话页面
// 1. 先查对话列表,发现当前对话有未完成的会话
//    会话状态: "generating",已生成到 seq=42

// 2. 客户端本地已显示了 seq=0~15 的内容(退出前收到的)
//    从 UI 中取出 lastDisplayedSeq = 15

// 3. 发起续传请求
POST /v1/chat/resume
{
    "session_id": "abc-123",
    "last_seq":   15              // 告诉服务端:我收到了前 15 条
}
// 服务端处理续传请求
func ResumeSession(sessionId, lastSeq) {

    // 1. 查 Redis 确认会话存在且状态为 "generating"
    session := redis.HGetAll("session:" + sessionId)
    if session.Status == "" {
        return 404 "会话不存在或已过期"
    }

    // 2. 补发 lastSeq 之后的所有历史 token(一次性)
    history := redis.LRange(
        "session:"+sessionId+":tokens",
        lastSeq,   // 起始位置 = 客户端最后收到的序号
        -1,         // 到末尾
    )

    // 3. 先把缺少的历史内容一次性发给客户端
    for _, tokenJson := range history {
        writeSSE("data: " + tokenJson + "\n\n")
    }

    // 4. 然后继续实时订阅,推送后续新 token
    pubsub := redis.Subscribe("channel:session:" + sessionId)
    for msg := range pubsub.Channel() {
        if msg.Payload == "__DONE__" { break }
        writeSSE("data: " + msg.Payload + "\n\n")
    }
}
关键点:为什么用 last_seq 而不是时间戳?
用序号(sequence number)定位,比时间戳精确得多。客户端说「我拿到了前 15 条」,服务端就从第 16 条开始补推,不会丢也不会重复。这是消息队列的经典消费位点(offset)模式。

6 完整时序图

📱 客户端 🟢 LLM 推理 🔴 Redis Phase 1 ① POST 提问 + 创建会话 HSET session:{id} OK ② 返回 session_id Phase 2 ③ 客户端订阅 SSE 频道 RPUSH seq=1 "春" + PUBLISH Pub/Sub → SSE: seq=1 "春" RPUSH seq=2 "眠" + PUBLISH SSE: seq=2 "眠" RPUSH seq=3 "不" + PUBLISH SSE: seq=3 "不" 断开! APP 退出,SSE 连接断开 Phase 3 RPUSH seq=4 "觉" (继续写!) RPUSH seq=5 "晓" (不受影响) HSET status="completed" Phase 4 ④ POST /resume last_seq=3 LRANGE seq 3→-1 返回 seq=4,5 ⑤ 补发 "觉晓" → SSE → 继续订阅 ✅ 客户端显示: 春眠不觉晓

7 进阶:客户端如何「还原」UI?

断线重连后,不只是拿到 token 就行 —— 客户端还要把 UI 恢复成「正在输出中」的状态。

  1. 恢复消息列表:对话页面的 messages 列表不是存在内存里的,而是每次都从服务端拉取的。GET /conversations/:id/messages 返回所有消息(包括不完整的 AI 回复)。
  2. 检测未完成消息:服务端返回的消息中,AI 回复带有 status: "generating" 标记,客户端看到这个就知道「这条还没输出完」。
  3. 显示打字动画:客户端在未完成消息末尾显示闪烁光标或「...」动画,让用户感知到「还在生成中」。
  4. 发起续传:如果消息 session_id 还在、状态为 generating,自动调 resume 接口,开始补推。
  5. 增量渲染:续传回来的 token 逐字追加到消息末尾,继续「打字机效果」。
// 客户端 APP 重新进入对话的伪代码
async function enterConversation(conversationId) {

    // 1. 拉取消息列表(从服务端/本地缓存)
    const messages = await api.getMessages(conversationId)

    // 2. 渲染 UI
    renderMessages(messages)

    // 3. 找到最后一条未完成的 AI 回复
    const lastMsg = messages[messages.length - 1]
    if (lastMsg.role === 'assistant' && lastMsg.status === 'generating') {

        // 4. 显示「正在生成」的光标动画
        showTypingIndicator()

        // 5. 发起续传(last_seq 是这条消息已收到的 token 数)
        const stream = await api.resumeStream(lastMsg.session_id, lastMsg.last_seq)

        // 6. 逐 token 追加到 UI
        for await (const chunk of stream) {
            lastMsg.content += chunk.content
            lastMsg.last_seq = chunk.seq
            updateUI()  // 增量渲染,打字机效果
        }

        // 7. 完成后去掉光标动画
        hideTypingIndicator()
    }
}

8 关键设计决策与权衡

这个方案不是免费的,有一些必须要做的取舍:

决策点Option AOption B推荐
消息存储 Redis 纯内存,性能好但有内存上限 消息队列 (Kafka/Pulsar),支持持久化和回溯 小规模用 Redis,大规模用 Kafka
会话过期 推理完成后立即删除,节省资源 保留 N 分钟(如 30min),给续传窗口 保留 30 分钟,过期后提示「已过期」
推送方式 WebSocket 全双工,实时性好 HTTP 长轮询,兼容性好,防火墙友好 移动端优先长轮询,桌面端用 WS
推理取消 客户端断开就取消推理,省 GPU 客户端断开推理继续,可续传 要看产品定位:豆包选 B,ChatGPT 大部分场景选 A
成本考量 断线续传的核心代价是:用户退出后服务端仍然占用 GPU 推理资源。如果 100 万个用户同时退出,GPU 可能白白燃烧。所以大部分产品会设一个超时(如 5 分钟内没续传就释放资源),或对免费版不提供续传。

9 一句话总结

豆包的「退出再进继续输出」并不是什么魔法,而是把
「SSE 长连接」替换成了「Redis 消息队列 + 序号偏移续传」
让 LLM 推理进程和客户端连接完全解耦

推理不管你在线不在线,只管往 Redis 里写 token。
客户端上线后拿着上次的序号来取,从断点继续。

这就是生产者-消费者模型在大模型流式输出场景下的经典应用。

生产解耦 = 断线续传