LLM API 设计:HTTP 连接管理

深入理解连接池、Keep-Alive、超时策略、流式连接与性能优化

📑 目录

  1. LLM API 与 HTTP 连接的特殊性
  2. Keep-Alive:持久连接的基石
  3. 连接池管理
  4. 活跃检查:心跳与健康监测
  5. 非活跃连接主动关闭
  6. 超时配置体系
  7. 重试与退避策略
  8. 流式连接管理(SSE & WebSocket)
  9. DNS 预解析与缓存
  10. HTTP/2 与 HTTP/3 多路复用
  11. 完整代码示例(Python / Node.js)
  12. 最佳实践速查表

1. LLM API 与 HTTP 连接的特殊性

LLM API(OpenAI、Claude、本地 vLLM 等)的 HTTP 连接管理不同于常规 Web API。 核心区别在于:

⏱️

高延迟请求

单个请求可达 30s–120s,远超普通 API 的 200ms

🌊

流式响应

SSE 长连接持续推送 token,必须管理流生命周期

📦

大 Payload

多轮对话的 context 可达数十 KB,影响连接吞吐

🔄

高并发

生产环境中同时发起数百个推理请求

核心挑战 如何在大量长耗时请求的场景下,保持连接的稳定、高效与可控,避免连接泄漏、端口耗尽和资源浪费。

2. Keep-Alive:持久连接的基石

2.1 什么是 Keep-Alive?

HTTP/1.1 默认启用 Connection: keep-alive,允许在同一个 TCP 连接上发送多个 HTTP 请求/响应, 避免每次请求都重新三次握手和四次挥手。

❌ 无 Keep-Alive(每次新建 TCP) Client Server SYN+ACK → Req → FIN SYN+ACK → Req → FIN SYN+ACK → Req → FIN ✅ Keep-Alive(复用 TCP) Client Server TCP 连接建立 请求 #1 请求 #2 请求 #3 空闲超时 → FIN

2.2 Keep-Alive 关键参数

参数含义推荐值(LLM API)
keepalive_timeout空闲连接保持时间(服务端)60s–120s
max_keepalive_requests单连接最大请求数100–500
keepalive(客户端)是否启用 TCP Keep-Alive 探测true
TCP_KEEPIDLETCP 空闲多久后开始探测60s
TCP_KEEPINTVL探测间隔10s
TCP_KEEPCNT探测失败次数上限3
LLM API 最佳实践 对于 LLM 场景,Keep-Alive 是必需的。单次推理耗时数十秒,如果每请求都重建 TCP 连接,三次握手 + TLS 握手将带来数百毫秒的额外延迟,占请求总耗时的 1–5%。

3. 连接池管理

3.1 为什么需要连接池?

连接池预先建立并维护一批 TCP 连接,请求到来时直接从中获取,用完归还。 核心收益:消除连接建立的延迟开销限制并发连接数量防止端口耗尽。

3.2 连接池核心参数

参数说明LLM API 推荐值
最大连接数池中能同时存在的连接上限50–200
每主机最大连接数对同一目标 host 的最大连接数20–50
空闲连接保活时间空闲连接在池中驻留的最长时间60s–120s
连接获取超时从池中获取连接的最长等待时间5s–10s
逐出策略空闲连接超出上限时的清理策略LRU / 最旧优先

3.3 Connections Per Host 的计算

一个常见的问题是:"Given 每个 host 允许 20 个连接,我有 100 个并发请求,会怎样?"

连接池并发调度示意(max_conn_per_host = 20) 100 并发请求 连接池(20 个连接) ... 20 个全部忙碌 热点连接自动复用 等待队列 80 个请求排队 等待超时后抛出 TimeoutError,保护上游不被无限阻塞
关键警告 如果连接池太小 → 大量请求排队等待 → 上游超时雪崩。
如果连接池太大 → 耗尽本地端口(默认 28232 个临时端口)→ Address already in use

4. 活跃检查:心跳与健康监测

4.1 为什么需要活跃检查?

在 LLM API 的场景中,连接可能长时间处于"空闲但存活"的状态。 但网络中间件(NAT 网关、负载均衡器、防火墙)会静默丢弃"看起来空闲"的连接。 如果你在下一次请求时才发现连接已死,就会产生 Connection Reset 错误和重试延迟。

活跃检查的两种模式 模式 A:TCP Keep-Alive(OS 层) 操作系统发送空 ACK 包探测 对应用层透明,无额外代码 粒度较粗(秒级),不检测应用层健康 模式 B:HTTP Health Check(应用层) 发送 GET /health 或 HEAD / 验证服务端真正可用 可检测 503 / 过载 / 认证过期

4.2 活跃检查实现策略

策略检测层级适用场景频率
TCP Keep-Alive传输层所有 HTTP/1.1 连接60s 空闲后开始探测
连接池逐出前预检连接池管理归还连接时 / 获取前每次获取
定时 Health Check应用层高可用/负载均衡场景每 30s–60s
请求前探测连接池管理空闲超时的旧连接获取时检查
断路器(Circuit Breaker)应用层连续失败时的熔断按失败次数触发

4.3 Python 示例:TCP Keep-Alive 设置

import socket
import httpx

# 方式 1:通过 httpx 的 transport 层设置
transport = httpx.HTTPTransport(
    keepalive_expiry=60.0,          # 空闲 60s 后关闭
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=30,
    ),
)

# 方式 2:通过底层 socket 设置 TCP Keep-Alive
with httpx.Client(transport=transport) as client:
    # httpx 底层使用 httpcore,要直接设置 socket 需要自定义 transport
    pass

# 方式 3(推荐):使用 aiohttp 可以直接设置 TCP keepalive
import aiohttp

connector = aiohttp.TCPConnector(
    limit=100,                       # 总连接数
    limit_per_host=30,               # 每 host 连接数
    ttl_dns_cache=300,               # DNS 缓存 5 分钟
    keepalive_timeout=60,            # 空闲 60s 关闭
    enable_cleanup_closed=True,      # 自动清理已关闭的连接
    force_close=False,               # 不强制关闭(使用 Keep-Alive)
)

5. 非活跃连接主动关闭

5.1 为什么要主动关闭?

空闲连接不是免费的。每个连接占用:

如果不主动清理,连接池中会堆积大量"僵尸连接"——看起来还活着,实际服务端或中间件早已断开。 下次使用时才会发现 Connection reset by peer,产生不必要的重试。

5.2 闲置连接的生命周期

闲置连接生命周期 活跃使用中 收发请求 空闲等待 延迟关闭计时 超时关闭 发送 FIN GC 回收 释放资源 重新创建 按需 t=0 t=last_req + 5s t=last_req + 60s t=last_req + 61s

5.3 主动关闭的实现方式

# === 方式 1:基于时间的清理(后台任务) ===
import asyncio
import time
from collections import defaultdict

class ConnectionJanitor:
    """连接清理器:定期扫描并关闭超过 idle_timeout 的空闲连接"""

    def __init__(self, idle_timeout: float = 60.0, check_interval: float = 15.0):
        self.idle_timeout = idle_timeout
        self.check_interval = check_interval
        self._connections: dict[str, tuple[object, float]] = {}  # conn_id -> (conn, last_used)

    def mark_used(self, conn_id: str, conn: object):
        self._connections[conn_id] = (conn, time.monotonic())

    def mark_closed(self, conn_id: str):
        self._connections.pop(conn_id, None)

    async def run(self):
        """后台清理循环"""
        while True:
            await asyncio.sleep(self.check_interval)
            now = time.monotonic()
            stale = [
                cid for cid, (_, last) in self._connections.items()
                if now - last > self.idle_timeout
            ]
            for cid in stale:
                conn, _ = self._connections.pop(cid)
                try:
                    await conn.aclose()  # 优雅关闭
                except Exception:
                    pass  # 已经断开的忽略
# === 方式 2:使用 httpx 内置的 keepalive_expiry ===
import httpx

client = httpx.AsyncClient(
    limits=httpx.Limits(
        max_keepalive_connections=20,   # 保持最多 20 个空闲连接
        keepalive_expiry=60.0,          # 空闲 60 秒后自动关闭
    ),
    timeout=httpx.Timeout(
        connect=10.0,    # 连接超时
        read=120.0,      # 读取超时(LLM 推理可能很长)
        write=30.0,      # 写入超时
        pool=10.0,       # 从池中获取连接超时
    ),
)
最佳实践keepalive_expiry 设置为比服务端 Keep-Alive 超时稍短的值(如服务端 120s,客户端设 90s)。 这样可以确保永远是客户端先关闭,避免服务端先断开导致 "Connection reset"。

6. 超时配置体系

6.1 多层超时模型

LLM API 多层超时体系 DNS 解析 3s–5s TCP 连接 5s–10s TLS 握手 5s–10s 请求发送 30s 读取超时(Read Timeout)— LLM API 核心 流式首 token:10s–15s | 非流式全长:60s–180s | Token 间间隔:5s–10s 总超时(Total Timeout)— 兜底:300s

6.2 LLM API 特有的超时考量

阶段典型耗时建议超时失败处理
DNS 解析< 100ms3s换 DNS / 重试
TCP + TLS50–500ms10s重试 2 次
首 Token(TTFT)200ms–3s15s重试 / 降级模型
流式 Token 间20–100ms10s断流重连
非流式完整响应2s–120s180s重试 / 超时返回部分
连接池获取< 1ms5s扩容 / 限流
流式场景特别注意 流式 SSE 连接中,如果超过 token_interval_timeout 没有收到新 token,说明可能: (1) 服务端推理卡住 (2) 网络中断但 TCP 还没感知 (3) 被中间件静默断开。 此时应主动断开并重试,而不是无限等待。

7. 重试与退避策略

7.1 哪些错误值得重试?

错误类型是否重试说明
ConnectionError✅ 是连接被拒绝,可能服务重启中
RemoteDisconnected✅ 是服务端主动断开(可能是空闲超时)
ReadTimeout⚠️ 谨慎先确认不是 prompt 太长导致推理时间本身就很长
PoolTimeout✅ 是连接池满,稍等后重试
HTTP 429✅ 是限流,使用 Retry-After header
HTTP 503✅ 是服务暂时不可用
HTTP 401 / 403❌ 否认证问题,重试没用
HTTP 400 / 422❌ 否请求参数错误

7.2 指数退避 + Jitter

import asyncio
import random

class LLMAPIRetry:
    """LLM API 专用重试器:指数退避 + 随机抖动"""

    RETRYABLE_STATUSES = {429, 503, 502, 504}
    RETRYABLE_ERRORS = (
        ConnectionError, TimeoutError,
        # httpx specific
    )

    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        jitter: bool = True,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter

    def get_delay(self, attempt: int) -> float:
        """计算第 N 次重试的等待时间"""
        delay = min(
            self.base_delay * (2 ** attempt),  # 指数增长:1s, 2s, 4s, 8s
            self.max_delay,
        )
        if self.jitter:
            delay = delay * (0.5 + random.random())  # 50%–150% 随机抖动
        return delay

    async def execute_with_retry(self, fn, *args, **kwargs):
        last_exception = None
        for attempt in range(self.max_retries + 1):
            try:
                return await fn(*args, **kwargs)
            except Exception as e:
                last_exception = e
                if attempt >= self.max_retries:
                    raise

                if not self._is_retryable(e):
                    raise

                delay = self.get_delay(attempt)
                print(f"重试 {attempt+1}/{self.max_retries},等待 {delay:.1f}s...")
                await asyncio.sleep(delay)

        raise last_exception

    def _is_retryable(self, error: Exception) -> bool:
        # httpx.HTTPStatusError 需要检查 status_code
        # 简化示例:ConnectionError / TimeoutError 一律重试
        if isinstance(error, self.RETRYABLE_ERRORS):
            return True
        return False
为什么需要 Jitter? 如果没有随机抖动,当大量客户端同时遇到 429/503 时,它们会在完全相同的时刻一起重试,形成"惊群效应"。 加入随机抖动后,重试请求被分散到一个小时间窗口内,避免对服务端造成二次冲击。

8. 流式连接管理(SSE & WebSocket)

8.1 SSE(Server-Sent Events)连接管理

LLM API 的流式响应通常使用 SSE 协议。与普通 HTTP 请求不同,SSE 连接在整个推理过程中保持打开, 可能持续数十秒到数分钟。这带来了独特的连接管理挑战。

8.2 SSE 连接生命周期管理

SSE 流式连接状态机 Idle 等待请求 Connected 连接已建立 Streaming 接收 token Done / Closed 收到 [DONE] 或关闭 Token Timeout Token 间超时 → 重试 Connection Lost 网络中断 → 重试

8.3 SSE 连接管理关键代码

import asyncio
import httpx

class SSEConnectionManager:
    """SSE 流式连接管理器"""

    def __init__(
        self,
        token_timeout: float = 10.0,      # token 间超时
        first_token_timeout: float = 15.0,  # 首 token 超时
        max_retries: int = 2,
    ):
        self.token_timeout = token_timeout
        self.first_token_timeout = first_token_timeout
        self.max_retries = max_retries

    async def stream_with_keepalive(
        self, client: httpx.AsyncClient, url: str, json_data: dict
    ):
        """带超时检测的流式请求"""
        for attempt in range(self.max_retries + 1):
            try:
                async with client.stream(
                    "POST", url, json=json_data, timeout=self.token_timeout
                ) as response:
                    async for line in response.aiter_lines():
                        if line.startswith("data: "):
                            yield line
                        # 每收到一行,超时计时器自动重置
                return  # 正常结束
            except httpx.ReadTimeout:
                if attempt < self.max_retries:
                    print(f"Token 超时,重试 {attempt+1}/{self.max_retries}")
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
            except (httpx.RemoteProtocolError, httpx.ConnectError):
                if attempt < self.max_retries:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise

    async def graceful_close(self, response: httpx.Response):
        """优雅关闭流式连接"""
        try:
            await response.aclose()
        except Exception:
            pass  # 可能已经断开

8.4 WebSocket 连接管理(本地 vLLM / Ollama)

import asyncio
import websockets

class WebSocketLLMClient:
    """WebSocket 连接的 LLM 客户端(用于 vLLM / Ollama)"""

    def __init__(self, uri: str, ping_interval: int = 20, ping_timeout: int = 10):
        self.uri = uri
        self.ws: websockets.WebSocketClientProtocol | None = None
        self.ping_interval = ping_interval
        self.ping_timeout = ping_timeout
        self._closed = False

    async def connect(self):
        """建立 WebSocket 连接并启动心跳"""
        self.ws = await websockets.connect(
            self.uri,
            ping_interval=self.ping_interval,  # 自动 ping 间隔
            ping_timeout=self.ping_timeout,    # ping 超时
            close_timeout=5,                    # 关闭超时
            max_size=10 * 1024 * 1024,         # 最大消息 10MB
        )

    async def send_prompt(self, prompt: str) -> str:
        """发送 prompt 并等待完整响应"""
        if self.ws is None or self.ws.closed:
            await self.connect()

        await self.ws.send(prompt)
        response = await self.ws.recv()
        return response

    async def close(self):
        """优雅关闭 WebSocket"""
        if self.ws and not self.ws.closed:
            await self.ws.close()
        self._closed = True
SSE vs WebSocket 选择建议 对于 LLM API,SSE 通常是更好的选择:单向流(服务端→客户端),更简单的错误处理,天然的 HTTP 语义。 WebSocket 更适合需要双向交互的场景(如 function calling 的中途干预)。

9. DNS 预解析与缓存

9.1 为什么 DNS 是隐藏的性能杀手?

每次新建 TCP 连接都可能触发 DNS 解析。在 LLM API 的高频调用场景下,DNS 解析延迟(通常 50–200ms)会显著累积。 更糟的是,如果 DNS 服务器响应慢或不可用,会导致所有新连接阻塞。

9.2 优化策略

策略说明效果
系统 DNS 缓存操作系统自动缓存(Linux nscd, macOS mDNSResponder)基础优化,TTL 受限
应用层 DNS 缓存在 HTTP 客户端中缓存解析结果消除同进程内的重复解析
DNS 预解析在连接真正需要之前提前解析隐藏延迟
HTTP Keep-Alive连接复用避免新建连接根本性地减少 DNS 查询次数
静态 IP / Hosts固定 IP 地址(仅内网环境)零 DNS 延迟

9.3 Python 示例

import aiohttp

# aiohttp 内置 DNS 缓存
connector = aiohttp.TCPConnector(
    ttl_dns_cache=600,      # DNS 缓存 10 分钟
    use_dns_cache=True,     # 启用 DNS 缓存
    family=socket.AF_INET,  # 仅 IPv4(减少 AAAA 查询)
)

# === DNS 预解析 ===
import socket

# 启动时预先解析常用 API 端点的 IP
async def pre_resolve_dns(hosts: list[str]):
    loop = asyncio.get_event_loop()
    for host in hosts:
        await loop.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP)

# 初始化时调用
await pre_resolve_dns([
    "api.openai.com",
    "api.anthropic.com",
    "your-vllm-host.internal",
])

10. HTTP/2 与 HTTP/3 多路复用

10.1 HTTP/1.1 的局限与多路复用的优势

HTTP/1.1 中,即使启用了 Keep-Alive,请求仍然是串行的——一个连接同一时刻只能处理一个请求。 虽然可以通过多个连接实现并发,但这会消耗更多资源。

HTTP/2 引入 多路复用(Multiplexing):在单个 TCP 连接上同时发送多个请求/响应流, 互不阻塞。这对 LLM API 的并发调用特别有价值。

HTTP/1.1 vs HTTP/2 多路复用对比 HTTP/1.1(连接池 + 串行) Req 1 ——————→ 等待 Req 2 ——————→ Req 3 ——————→ 等待 需要 3 个连接 × 串行 = 头阻塞(HOL) HTTP/2(单连接多路复用) █ 单个 TCP 连接 █ Stream 1 Stream 2 Stream 3 Stream 4 Stream 5 1 个连接承载 N 个并发流,无头阻塞

10.2 HTTP/2 实践考虑

import httpx

# httpx 默认支持 HTTP/2(需要安装 h2 依赖)
# pip install httpx[http2]

client = httpx.AsyncClient(http2=True)

# 注意:HTTP/2 多路复用对连接池参数的影响
# 单个 HTTP/2 连接就可以承载大量并发流,
# max_connections 可以比 HTTP/1.1 场景设得更小
client = httpx.AsyncClient(
    http2=True,
    limits=httpx.Limits(
        max_connections=20,              # 少量连接即可(HTTP/2 多路复用)
        max_keepalive_connections=10,
        keepalive_expiry=120.0,
    ),
)
HTTP/3 (QUIC) 前瞻 HTTP/3 基于 UDP 的 QUIC 协议,进一步消除了 TCP 层面的头阻塞问题。 但目前 LLM API 服务端对 HTTP/3 的支持尚不普及。如果在本地部署 vLLM/Ollama 等, 可以考虑前置 Nginx/Caddy 启用 HTTP/3 代理。

11. 完整代码示例

11.1 Python — 生产级 LLM API 客户端

"""
生产级 LLM API 客户端
涵盖:连接池、Keep-Alive、健康检查、超时、重试、流式管理
"""

import asyncio
import time
import random
from typing import AsyncIterator, Optional
from dataclasses import dataclass, field

import httpx


@dataclass
class LLMClientConfig:
    """LLM 客户端配置"""
    base_url: str = "https://api.openai.com"
    api_key: str = ""

    # 连接池
    max_connections: int = 100
    max_keepalive_connections: int = 30
    keepalive_expiry: float = 90.0   # 比服务端低 30s

    # 超时
    connect_timeout: float = 10.0
    read_timeout: float = 180.0      # LLM 推理可能很长
    write_timeout: float = 30.0
    pool_timeout: float = 10.0
    first_token_timeout: float = 15.0
    token_interval_timeout: float = 10.0

    # 重试
    max_retries: int = 3
    base_delay: float = 1.0

    # HTTP/2
    use_http2: bool = True


class ProductionLLMClient:
    """生产级 LLM API 客户端"""

    def __init__(self, config: LLMClientConfig):
        self.config = config
        self._client: Optional[httpx.AsyncClient] = None
        self._health_status: dict[str, bool] = {}

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, *args):
        await self.close()

    async def start(self):
        """初始化 HTTP 客户端并预建立连接"""
        limits = httpx.Limits(
            max_connections=self.config.max_connections,
            max_keepalive_connections=self.config.max_keepalive_connections,
            keepalive_expiry=self.config.keepalive_expiry,
        )
        timeout = httpx.Timeout(
            connect=self.config.connect_timeout,
            read=self.config.read_timeout,
            write=self.config.write_timeout,
            pool=self.config.pool_timeout,
        )
        transport = httpx.AsyncHTTPTransport(
            limits=limits,
            retries=0,  # 我们自己管理重试
        )
        self._client = httpx.AsyncClient(
            base_url=self.config.base_url,
            headers={"Authorization": f"Bearer {self.config.api_key}"},
            timeout=timeout,
            transport=transport,
            http2=self.config.use_http2,
        )
        # 预建连接(热身)
        await self._warmup()

    async def _warmup(self):
        """预热:发送健康检查请求以预建连接"""
        try:
            await self.health_check()
        except Exception:
            pass  # 预热失败不影响后续使用

    async def health_check(self) -> bool:
        """应用层健康检查"""
        try:
            response = await self._client.get(
                "/v1/models",
                timeout=httpx.Timeout(connect=5.0, read=5.0),
            )
            self._health_status[self.config.base_url] = response.status_code == 200
            return self._health_status[self.config.base_url]
        except Exception:
            self._health_status[self.config.base_url] = False
            return False

    async def chat(
        self,
        messages: list[dict],
        model: str = "gpt-4",
        stream: bool = False,
    ) -> dict | AsyncIterator[str]:
        """发送聊天请求(非流式)"""
        for attempt in range(self.config.max_retries + 1):
            try:
                response = await self._client.post(
                    "/v1/chat/completions",
                    json={
                        "model": model,
                        "messages": messages,
                        "stream": stream,
                    },
                )
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as e:
                if e.response.status_code == 429:
                    delay = self._get_retry_delay(attempt, e.response)
                    await asyncio.sleep(delay)
                    continue
                if e.response.status_code in (502, 503, 504) and attempt < self.config.max_retries:
                    await asyncio.sleep(self._get_retry_delay(attempt))
                    continue
                raise
            except (httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadTimeout):
                if attempt < self.config.max_retries:
                    await asyncio.sleep(self._get_retry_delay(attempt))
                    continue
                raise

    async def chat_stream(
        self, messages: list[dict], model: str = "gpt-4"
    ) -> AsyncIterator[str]:
        """发送流式聊天请求(SSE)"""
        # 使用独立的客户端以避免干扰非流式连接
        async with httpx.AsyncClient(
            base_url=self.config.base_url,
            headers={"Authorization": f"Bearer {self.config.api_key}"},
            timeout=httpx.Timeout(
                connect=self.config.connect_timeout,
                read=self.config.token_interval_timeout,  # 流式用 token 间超时
            ),
        ) as stream_client:
            for attempt in range(self.config.max_retries + 1):
                try:
                    async with stream_client.stream(
                        "POST",
                        "/v1/chat/completions",
                        json={
                            "model": model,
                            "messages": messages,
                            "stream": True,
                        },
                    ) as response:
                        response.raise_for_status()
                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                data = line[6:]
                                if data == "[DONE]":
                                    return
                                yield data
                        return  # 正常结束
                except (httpx.ReadTimeout, httpx.RemoteProtocolError, httpx.ConnectError):
                    if attempt < self.config.max_retries:
                        await asyncio.sleep(self._get_retry_delay(attempt))
                        continue
                    raise

    def _get_retry_delay(self, attempt: int, response=None) -> float:
        """指数退避 + 抖动 + Retry-After"""
        if response is not None:
            retry_after = response.headers.get("Retry-After")
            if retry_after:
                try:
                    return float(retry_after)
                except ValueError:
                    pass
        delay = min(self.config.base_delay * (2 ** attempt), 60.0)
        return delay * (0.5 + random.random())

    async def close(self):
        """优雅关闭:清理所有连接"""
        if self._client:
            await self._client.aclose()
            self._client = None

    @property
    def pool_stats(self) -> dict:
        """连接池统计"""
        if self._client and hasattr(self._client, '_transport'):
            pool = self._client._transport._pool
            return {
                "active": len(pool._connections),
                "max": pool._max_connections,
            }
        return {}


# === 使用示例 ===
async def main():
    config = LLMClientConfig(
        base_url="https://api.openai.com",
        api_key="sk-xxxx",
    )
    async with ProductionLLMClient(config) as llm:
        # 先检查健康
        healthy = await llm.health_check()
        print(f"服务健康: {healthy}")

        # 发送请求
        result = await llm.chat(
            messages=[{"role": "user", "content": "Hello"}],
        )
        print(result)

        # 查看连接池状态
        print(llm.pool_stats)


if __name__ == "__main__":
    asyncio.run(main())

11.2 Node.js — LLM API 连接管理

/**
 * 生产级 LLM API 客户端 (Node.js)
 * 使用 undici (Node.js 内置) 实现连接池管理
 */

const { Agent, request, setGlobalDispatcher } = require('undici');

// ========== 连接池配置 ==========
const llmAgent = new Agent({
  // 连接池
  connections: 100,              // 最大并发连接数
  pipelining: 1,                 // HTTP/1.1 管道(设为 0 禁用)
  keepAliveTimeout: 90_000,      // 空闲连接保持 90s
  keepAliveMaxTimeout: 120_000,
  keepAliveTimeoutThreshold: 1_000,

  // 超时
  connectTimeout: 10_000,        // 连接超时 10s
  headersTimeout: 30_000,        // 等待响应头超时
  bodyTimeout: 180_000,          // 等待响应体超时(LLM 需要长超时)

  // DNS
  autoSelectFamily: true,
  autoSelectFamilyAttemptTimeout: 2_000,
});

// ========== 指数退避重试 ==========
class RetryWithBackoff {
  constructor({ maxRetries = 3, baseDelay = 1000, maxDelay = 30000 }) {
    this.maxRetries = maxRetries;
    this.baseDelay = baseDelay;
    this.maxDelay = maxDelay;
  }

  getDelay(attempt) {
    const delay = Math.min(
      this.baseDelay * Math.pow(2, attempt),
      this.maxDelay
    );
    // 添加抖动 (±25%)
    return delay * (0.75 + Math.random() * 0.5);
  }

  isRetryable(statusCode) {
    return [429, 502, 503, 504].includes(statusCode);
  }

  async execute(fn) {
    let lastError;
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await fn();
      } catch (error) {
        lastError = error;
        const statusCode = error.statusCode || 0;

        if (attempt >= this.maxRetries || !this.isRetryable(statusCode)) {
          throw error;
        }

        const delay = this.getDelay(attempt);
        console.log(`重试 ${attempt + 1}/${this.maxRetries},等待 ${delay.toFixed(0)}ms`);
        await new Promise(r => setTimeout(r, delay));
      }
    }
    throw lastError;
  }
}

// ========== LLM 客户端 ==========
class LLMClient {
  constructor({ baseUrl, apiKey }) {
    this.baseUrl = baseUrl;
    this.apiKey = apiKey;
    this.retry = new RetryWithBackoff({ maxRetries: 3 });
  }

  async healthCheck() {
    try {
      const { statusCode } = await request(`${this.baseUrl}/v1/models`, {
        method: 'GET',
        headers: { 'Authorization': `Bearer ${this.apiKey}` },
        dispatcher: llmAgent,
      });
      return statusCode === 200;
    } catch {
      return false;
    }
  }

  async chat({ messages, model = 'gpt-4', stream = false }) {
    return this.retry.execute(async () => {
      const { statusCode, body } = await request(
        `${this.baseUrl}/v1/chat/completions`,
        {
          method: 'POST',
          headers: {
            'Authorization': `Bearer ${this.apiKey}`,
            'Content-Type': 'application/json',
          },
          body: JSON.stringify({ model, messages, stream }),
          dispatcher: llmAgent,
        }
      );

      if (statusCode === 429) {
        const err = new Error('Rate limited');
        err.statusCode = 429;
        throw err;
      }
      if (statusCode >= 500) {
        const err = new Error(`Server error: ${statusCode}`);
        err.statusCode = statusCode;
        throw err;
      }

      const data = await body.json();
      return data;
    });
  }

  /** SSE 流式请求 */
  async *chatStream({ messages, model = 'gpt-4' }) {
    for (let attempt = 0; attempt <= 3; attempt++) {
      try {
        const { statusCode, body } = await request(
          `${this.baseUrl}/v1/chat/completions`,
          {
            method: 'POST',
            headers: {
              'Authorization': `Bearer ${this.apiKey}`,
              'Content-Type': 'application/json',
            },
            body: JSON.stringify({ model, messages, stream: true }),
            dispatcher: llmAgent,
            // 流式使用更短的 bodyTimeout
            bodyTimeout: 10_000,
          }
        );

        if (statusCode !== 200) throw new Error(`HTTP ${statusCode}`);

        for await (const chunk of body) {
          const lines = chunk.toString().split('\n');
          for (const line of lines) {
            if (line.startsWith('data: ')) {
              const data = line.slice(6);
              if (data === '[DONE]') return;
              yield data;
            }
          }
        }
        return;
      } catch (error) {
        if (attempt >= 3) throw error;
        await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
      }
    }
  }

  /** 关闭连接池 */
  async close() {
    await llmAgent.close();
  }
}

// ========== 使用示例 ==========
async function main() {
  const client = new LLMClient({
    baseUrl: 'https://api.openai.com',
    apiKey: 'sk-xxxx',
  });

  const healthy = await client.healthCheck();
  console.log(`服务健康: ${healthy}`);

  const result = await client.chat({
    messages: [{ role: 'user', content: 'Hello' }],
  });
  console.log(result.choices[0].message.content);
}

main().catch(console.error);

12. 最佳实践速查表

🔌

Keep-Alive 必须开

HTTP/1.1 默认启用,客户端 keepalive_expiry 比服务端 timeout 短 30s

🏊

连接池合理配

max_connections = 并发数 × 1.5,单 host 20-50,避免端口耗尽

💓

TCP Keep-Alive 探测

KEEPIDLE=60s, KEEPINTVL=10s, KEEPCNT=3,防止中间件断连

超时分层设置

Connect 5-10s / FirstToken 15s / TokenInterval 10s / Total 180s

🔄

退避 + 抖动

指数退避(1s→2s→4s),随机抖动(±50%),遵循 Retry-After

🩺

应用层健康检查

每 30s HEAD /health,及时剔除不可用节点

🗑️

后台清理僵尸连接

定时扫描空闲 >60s 的连接,主动关闭,防止 Connection Reset

🌐

DNS 预热 + 缓存

启动时预解析,TTL 缓存 5–10 分钟,减少解析延迟

📡

HTTP/2 多路复用

单连接承载多并发流,消除 HTTP/1.1 头阻塞

🌊

流式独立管理

SSE 使用独立连接和独立超时,token 间超时主动重试

🧹

优雅关闭

with/as 上下文管理,__aexit__ 中 await client.aclose()

📊

可观测性

记录连接池使用率、重试次数、各阶段延迟,及时调整参数

配置推荐速查(按场景)

场景连接池KeepAliveRead Timeout重试
低并发(< 10 QPS) max=20, per_host=10 60s 120s 2 次
中并发(10–50 QPS) max=100, per_host=30 90s 180s 3 次
高并发(50–200 QPS) max=200, per_host=50 90s 180s 3 次
流式为主 max=150, per_host=40 120s Token 间 10s 2 次
本地 vLLM/Ollama max=50, per_host=50 300s 300s 1 次