深入理解连接池、Keep-Alive、超时策略、流式连接与性能优化
LLM API(OpenAI、Claude、本地 vLLM 等)的 HTTP 连接管理不同于常规 Web API。 核心区别在于:
单个请求可达 30s–120s,远超普通 API 的 200ms
SSE 长连接持续推送 token,必须管理流生命周期
多轮对话的 context 可达数十 KB,影响连接吞吐
生产环境中同时发起数百个推理请求
HTTP/1.1 默认启用 Connection: keep-alive,允许在同一个 TCP 连接上发送多个 HTTP 请求/响应,
避免每次请求都重新三次握手和四次挥手。
| 参数 | 含义 | 推荐值(LLM API) |
|---|---|---|
keepalive_timeout | 空闲连接保持时间(服务端) | 60s–120s |
max_keepalive_requests | 单连接最大请求数 | 100–500 |
keepalive(客户端) | 是否启用 TCP Keep-Alive 探测 | true |
TCP_KEEPIDLE | TCP 空闲多久后开始探测 | 60s |
TCP_KEEPINTVL | 探测间隔 | 10s |
TCP_KEEPCNT | 探测失败次数上限 | 3 |
连接池预先建立并维护一批 TCP 连接,请求到来时直接从中获取,用完归还。 核心收益:消除连接建立的延迟开销,限制并发连接数量防止端口耗尽。
| 参数 | 说明 | LLM API 推荐值 |
|---|---|---|
| 最大连接数 | 池中能同时存在的连接上限 | 50–200 |
| 每主机最大连接数 | 对同一目标 host 的最大连接数 | 20–50 |
| 空闲连接保活时间 | 空闲连接在池中驻留的最长时间 | 60s–120s |
| 连接获取超时 | 从池中获取连接的最长等待时间 | 5s–10s |
| 逐出策略 | 空闲连接超出上限时的清理策略 | LRU / 最旧优先 |
一个常见的问题是:"Given 每个 host 允许 20 个连接,我有 100 个并发请求,会怎样?"
Address already in use。
在 LLM API 的场景中,连接可能长时间处于"空闲但存活"的状态。 但网络中间件(NAT 网关、负载均衡器、防火墙)会静默丢弃"看起来空闲"的连接。 如果你在下一次请求时才发现连接已死,就会产生 Connection Reset 错误和重试延迟。
| 策略 | 检测层级 | 适用场景 | 频率 |
|---|---|---|---|
| TCP Keep-Alive | 传输层 | 所有 HTTP/1.1 连接 | 60s 空闲后开始探测 |
| 连接池逐出前预检 | 连接池管理 | 归还连接时 / 获取前 | 每次获取 |
| 定时 Health Check | 应用层 | 高可用/负载均衡场景 | 每 30s–60s |
| 请求前探测 | 连接池管理 | 空闲超时的旧连接 | 获取时检查 |
| 断路器(Circuit Breaker) | 应用层 | 连续失败时的熔断 | 按失败次数触发 |
import socket
import httpx
# 方式 1:通过 httpx 的 transport 层设置
transport = httpx.HTTPTransport(
keepalive_expiry=60.0, # 空闲 60s 后关闭
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=30,
),
)
# 方式 2:通过底层 socket 设置 TCP Keep-Alive
with httpx.Client(transport=transport) as client:
# httpx 底层使用 httpcore,要直接设置 socket 需要自定义 transport
pass
# 方式 3(推荐):使用 aiohttp 可以直接设置 TCP keepalive
import aiohttp
connector = aiohttp.TCPConnector(
limit=100, # 总连接数
limit_per_host=30, # 每 host 连接数
ttl_dns_cache=300, # DNS 缓存 5 分钟
keepalive_timeout=60, # 空闲 60s 关闭
enable_cleanup_closed=True, # 自动清理已关闭的连接
force_close=False, # 不强制关闭(使用 Keep-Alive)
)
空闲连接不是免费的。每个连接占用:
如果不主动清理,连接池中会堆积大量"僵尸连接"——看起来还活着,实际服务端或中间件早已断开。
下次使用时才会发现 Connection reset by peer,产生不必要的重试。
# === 方式 1:基于时间的清理(后台任务) ===
import asyncio
import time
from collections import defaultdict
class ConnectionJanitor:
"""连接清理器:定期扫描并关闭超过 idle_timeout 的空闲连接"""
def __init__(self, idle_timeout: float = 60.0, check_interval: float = 15.0):
self.idle_timeout = idle_timeout
self.check_interval = check_interval
self._connections: dict[str, tuple[object, float]] = {} # conn_id -> (conn, last_used)
def mark_used(self, conn_id: str, conn: object):
self._connections[conn_id] = (conn, time.monotonic())
def mark_closed(self, conn_id: str):
self._connections.pop(conn_id, None)
async def run(self):
"""后台清理循环"""
while True:
await asyncio.sleep(self.check_interval)
now = time.monotonic()
stale = [
cid for cid, (_, last) in self._connections.items()
if now - last > self.idle_timeout
]
for cid in stale:
conn, _ = self._connections.pop(cid)
try:
await conn.aclose() # 优雅关闭
except Exception:
pass # 已经断开的忽略
# === 方式 2:使用 httpx 内置的 keepalive_expiry ===
import httpx
client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20, # 保持最多 20 个空闲连接
keepalive_expiry=60.0, # 空闲 60 秒后自动关闭
),
timeout=httpx.Timeout(
connect=10.0, # 连接超时
read=120.0, # 读取超时(LLM 推理可能很长)
write=30.0, # 写入超时
pool=10.0, # 从池中获取连接超时
),
)
keepalive_expiry 设置为比服务端 Keep-Alive 超时稍短的值(如服务端 120s,客户端设 90s)。
这样可以确保永远是客户端先关闭,避免服务端先断开导致 "Connection reset"。
| 阶段 | 典型耗时 | 建议超时 | 失败处理 |
|---|---|---|---|
| DNS 解析 | < 100ms | 3s | 换 DNS / 重试 |
| TCP + TLS | 50–500ms | 10s | 重试 2 次 |
| 首 Token(TTFT) | 200ms–3s | 15s | 重试 / 降级模型 |
| 流式 Token 间 | 20–100ms | 10s | 断流重连 |
| 非流式完整响应 | 2s–120s | 180s | 重试 / 超时返回部分 |
| 连接池获取 | < 1ms | 5s | 扩容 / 限流 |
| 错误类型 | 是否重试 | 说明 |
|---|---|---|
ConnectionError | ✅ 是 | 连接被拒绝,可能服务重启中 |
RemoteDisconnected | ✅ 是 | 服务端主动断开(可能是空闲超时) |
ReadTimeout | ⚠️ 谨慎 | 先确认不是 prompt 太长导致推理时间本身就很长 |
PoolTimeout | ✅ 是 | 连接池满,稍等后重试 |
| HTTP 429 | ✅ 是 | 限流,使用 Retry-After header |
| HTTP 503 | ✅ 是 | 服务暂时不可用 |
| HTTP 401 / 403 | ❌ 否 | 认证问题,重试没用 |
| HTTP 400 / 422 | ❌ 否 | 请求参数错误 |
import asyncio
import random
class LLMAPIRetry:
"""LLM API 专用重试器:指数退避 + 随机抖动"""
RETRYABLE_STATUSES = {429, 503, 502, 504}
RETRYABLE_ERRORS = (
ConnectionError, TimeoutError,
# httpx specific
)
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
jitter: bool = True,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""计算第 N 次重试的等待时间"""
delay = min(
self.base_delay * (2 ** attempt), # 指数增长:1s, 2s, 4s, 8s
self.max_delay,
)
if self.jitter:
delay = delay * (0.5 + random.random()) # 50%–150% 随机抖动
return delay
async def execute_with_retry(self, fn, *args, **kwargs):
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await fn(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt >= self.max_retries:
raise
if not self._is_retryable(e):
raise
delay = self.get_delay(attempt)
print(f"重试 {attempt+1}/{self.max_retries},等待 {delay:.1f}s...")
await asyncio.sleep(delay)
raise last_exception
def _is_retryable(self, error: Exception) -> bool:
# httpx.HTTPStatusError 需要检查 status_code
# 简化示例:ConnectionError / TimeoutError 一律重试
if isinstance(error, self.RETRYABLE_ERRORS):
return True
return False
LLM API 的流式响应通常使用 SSE 协议。与普通 HTTP 请求不同,SSE 连接在整个推理过程中保持打开, 可能持续数十秒到数分钟。这带来了独特的连接管理挑战。
import asyncio
import httpx
class SSEConnectionManager:
"""SSE 流式连接管理器"""
def __init__(
self,
token_timeout: float = 10.0, # token 间超时
first_token_timeout: float = 15.0, # 首 token 超时
max_retries: int = 2,
):
self.token_timeout = token_timeout
self.first_token_timeout = first_token_timeout
self.max_retries = max_retries
async def stream_with_keepalive(
self, client: httpx.AsyncClient, url: str, json_data: dict
):
"""带超时检测的流式请求"""
for attempt in range(self.max_retries + 1):
try:
async with client.stream(
"POST", url, json=json_data, timeout=self.token_timeout
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
yield line
# 每收到一行,超时计时器自动重置
return # 正常结束
except httpx.ReadTimeout:
if attempt < self.max_retries:
print(f"Token 超时,重试 {attempt+1}/{self.max_retries}")
await asyncio.sleep(2 ** attempt)
continue
raise
except (httpx.RemoteProtocolError, httpx.ConnectError):
if attempt < self.max_retries:
await asyncio.sleep(2 ** attempt)
continue
raise
async def graceful_close(self, response: httpx.Response):
"""优雅关闭流式连接"""
try:
await response.aclose()
except Exception:
pass # 可能已经断开
import asyncio
import websockets
class WebSocketLLMClient:
"""WebSocket 连接的 LLM 客户端(用于 vLLM / Ollama)"""
def __init__(self, uri: str, ping_interval: int = 20, ping_timeout: int = 10):
self.uri = uri
self.ws: websockets.WebSocketClientProtocol | None = None
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self._closed = False
async def connect(self):
"""建立 WebSocket 连接并启动心跳"""
self.ws = await websockets.connect(
self.uri,
ping_interval=self.ping_interval, # 自动 ping 间隔
ping_timeout=self.ping_timeout, # ping 超时
close_timeout=5, # 关闭超时
max_size=10 * 1024 * 1024, # 最大消息 10MB
)
async def send_prompt(self, prompt: str) -> str:
"""发送 prompt 并等待完整响应"""
if self.ws is None or self.ws.closed:
await self.connect()
await self.ws.send(prompt)
response = await self.ws.recv()
return response
async def close(self):
"""优雅关闭 WebSocket"""
if self.ws and not self.ws.closed:
await self.ws.close()
self._closed = True
每次新建 TCP 连接都可能触发 DNS 解析。在 LLM API 的高频调用场景下,DNS 解析延迟(通常 50–200ms)会显著累积。 更糟的是,如果 DNS 服务器响应慢或不可用,会导致所有新连接阻塞。
| 策略 | 说明 | 效果 |
|---|---|---|
| 系统 DNS 缓存 | 操作系统自动缓存(Linux nscd, macOS mDNSResponder) | 基础优化,TTL 受限 |
| 应用层 DNS 缓存 | 在 HTTP 客户端中缓存解析结果 | 消除同进程内的重复解析 |
| DNS 预解析 | 在连接真正需要之前提前解析 | 隐藏延迟 |
| HTTP Keep-Alive | 连接复用避免新建连接 | 根本性地减少 DNS 查询次数 |
| 静态 IP / Hosts | 固定 IP 地址(仅内网环境) | 零 DNS 延迟 |
import aiohttp
# aiohttp 内置 DNS 缓存
connector = aiohttp.TCPConnector(
ttl_dns_cache=600, # DNS 缓存 10 分钟
use_dns_cache=True, # 启用 DNS 缓存
family=socket.AF_INET, # 仅 IPv4(减少 AAAA 查询)
)
# === DNS 预解析 ===
import socket
# 启动时预先解析常用 API 端点的 IP
async def pre_resolve_dns(hosts: list[str]):
loop = asyncio.get_event_loop()
for host in hosts:
await loop.getaddrinfo(host, 443, proto=socket.IPPROTO_TCP)
# 初始化时调用
await pre_resolve_dns([
"api.openai.com",
"api.anthropic.com",
"your-vllm-host.internal",
])
HTTP/1.1 中,即使启用了 Keep-Alive,请求仍然是串行的——一个连接同一时刻只能处理一个请求。 虽然可以通过多个连接实现并发,但这会消耗更多资源。
HTTP/2 引入 多路复用(Multiplexing):在单个 TCP 连接上同时发送多个请求/响应流, 互不阻塞。这对 LLM API 的并发调用特别有价值。
import httpx
# httpx 默认支持 HTTP/2(需要安装 h2 依赖)
# pip install httpx[http2]
client = httpx.AsyncClient(http2=True)
# 注意:HTTP/2 多路复用对连接池参数的影响
# 单个 HTTP/2 连接就可以承载大量并发流,
# max_connections 可以比 HTTP/1.1 场景设得更小
client = httpx.AsyncClient(
http2=True,
limits=httpx.Limits(
max_connections=20, # 少量连接即可(HTTP/2 多路复用)
max_keepalive_connections=10,
keepalive_expiry=120.0,
),
)
"""
生产级 LLM API 客户端
涵盖:连接池、Keep-Alive、健康检查、超时、重试、流式管理
"""
import asyncio
import time
import random
from typing import AsyncIterator, Optional
from dataclasses import dataclass, field
import httpx
@dataclass
class LLMClientConfig:
"""LLM 客户端配置"""
base_url: str = "https://api.openai.com"
api_key: str = ""
# 连接池
max_connections: int = 100
max_keepalive_connections: int = 30
keepalive_expiry: float = 90.0 # 比服务端低 30s
# 超时
connect_timeout: float = 10.0
read_timeout: float = 180.0 # LLM 推理可能很长
write_timeout: float = 30.0
pool_timeout: float = 10.0
first_token_timeout: float = 15.0
token_interval_timeout: float = 10.0
# 重试
max_retries: int = 3
base_delay: float = 1.0
# HTTP/2
use_http2: bool = True
class ProductionLLMClient:
"""生产级 LLM API 客户端"""
def __init__(self, config: LLMClientConfig):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
self._health_status: dict[str, bool] = {}
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, *args):
await self.close()
async def start(self):
"""初始化 HTTP 客户端并预建立连接"""
limits = httpx.Limits(
max_connections=self.config.max_connections,
max_keepalive_connections=self.config.max_keepalive_connections,
keepalive_expiry=self.config.keepalive_expiry,
)
timeout = httpx.Timeout(
connect=self.config.connect_timeout,
read=self.config.read_timeout,
write=self.config.write_timeout,
pool=self.config.pool_timeout,
)
transport = httpx.AsyncHTTPTransport(
limits=limits,
retries=0, # 我们自己管理重试
)
self._client = httpx.AsyncClient(
base_url=self.config.base_url,
headers={"Authorization": f"Bearer {self.config.api_key}"},
timeout=timeout,
transport=transport,
http2=self.config.use_http2,
)
# 预建连接(热身)
await self._warmup()
async def _warmup(self):
"""预热:发送健康检查请求以预建连接"""
try:
await self.health_check()
except Exception:
pass # 预热失败不影响后续使用
async def health_check(self) -> bool:
"""应用层健康检查"""
try:
response = await self._client.get(
"/v1/models",
timeout=httpx.Timeout(connect=5.0, read=5.0),
)
self._health_status[self.config.base_url] = response.status_code == 200
return self._health_status[self.config.base_url]
except Exception:
self._health_status[self.config.base_url] = False
return False
async def chat(
self,
messages: list[dict],
model: str = "gpt-4",
stream: bool = False,
) -> dict | AsyncIterator[str]:
"""发送聊天请求(非流式)"""
for attempt in range(self.config.max_retries + 1):
try:
response = await self._client.post(
"/v1/chat/completions",
json={
"model": model,
"messages": messages,
"stream": stream,
},
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
delay = self._get_retry_delay(attempt, e.response)
await asyncio.sleep(delay)
continue
if e.response.status_code in (502, 503, 504) and attempt < self.config.max_retries:
await asyncio.sleep(self._get_retry_delay(attempt))
continue
raise
except (httpx.ConnectError, httpx.RemoteProtocolError, httpx.ReadTimeout):
if attempt < self.config.max_retries:
await asyncio.sleep(self._get_retry_delay(attempt))
continue
raise
async def chat_stream(
self, messages: list[dict], model: str = "gpt-4"
) -> AsyncIterator[str]:
"""发送流式聊天请求(SSE)"""
# 使用独立的客户端以避免干扰非流式连接
async with httpx.AsyncClient(
base_url=self.config.base_url,
headers={"Authorization": f"Bearer {self.config.api_key}"},
timeout=httpx.Timeout(
connect=self.config.connect_timeout,
read=self.config.token_interval_timeout, # 流式用 token 间超时
),
) as stream_client:
for attempt in range(self.config.max_retries + 1):
try:
async with stream_client.stream(
"POST",
"/v1/chat/completions",
json={
"model": model,
"messages": messages,
"stream": True,
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
return
yield data
return # 正常结束
except (httpx.ReadTimeout, httpx.RemoteProtocolError, httpx.ConnectError):
if attempt < self.config.max_retries:
await asyncio.sleep(self._get_retry_delay(attempt))
continue
raise
def _get_retry_delay(self, attempt: int, response=None) -> float:
"""指数退避 + 抖动 + Retry-After"""
if response is not None:
retry_after = response.headers.get("Retry-After")
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
delay = min(self.config.base_delay * (2 ** attempt), 60.0)
return delay * (0.5 + random.random())
async def close(self):
"""优雅关闭:清理所有连接"""
if self._client:
await self._client.aclose()
self._client = None
@property
def pool_stats(self) -> dict:
"""连接池统计"""
if self._client and hasattr(self._client, '_transport'):
pool = self._client._transport._pool
return {
"active": len(pool._connections),
"max": pool._max_connections,
}
return {}
# === 使用示例 ===
async def main():
config = LLMClientConfig(
base_url="https://api.openai.com",
api_key="sk-xxxx",
)
async with ProductionLLMClient(config) as llm:
# 先检查健康
healthy = await llm.health_check()
print(f"服务健康: {healthy}")
# 发送请求
result = await llm.chat(
messages=[{"role": "user", "content": "Hello"}],
)
print(result)
# 查看连接池状态
print(llm.pool_stats)
if __name__ == "__main__":
asyncio.run(main())
/**
* 生产级 LLM API 客户端 (Node.js)
* 使用 undici (Node.js 内置) 实现连接池管理
*/
const { Agent, request, setGlobalDispatcher } = require('undici');
// ========== 连接池配置 ==========
const llmAgent = new Agent({
// 连接池
connections: 100, // 最大并发连接数
pipelining: 1, // HTTP/1.1 管道(设为 0 禁用)
keepAliveTimeout: 90_000, // 空闲连接保持 90s
keepAliveMaxTimeout: 120_000,
keepAliveTimeoutThreshold: 1_000,
// 超时
connectTimeout: 10_000, // 连接超时 10s
headersTimeout: 30_000, // 等待响应头超时
bodyTimeout: 180_000, // 等待响应体超时(LLM 需要长超时)
// DNS
autoSelectFamily: true,
autoSelectFamilyAttemptTimeout: 2_000,
});
// ========== 指数退避重试 ==========
class RetryWithBackoff {
constructor({ maxRetries = 3, baseDelay = 1000, maxDelay = 30000 }) {
this.maxRetries = maxRetries;
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
}
getDelay(attempt) {
const delay = Math.min(
this.baseDelay * Math.pow(2, attempt),
this.maxDelay
);
// 添加抖动 (±25%)
return delay * (0.75 + Math.random() * 0.5);
}
isRetryable(statusCode) {
return [429, 502, 503, 504].includes(statusCode);
}
async execute(fn) {
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
const statusCode = error.statusCode || 0;
if (attempt >= this.maxRetries || !this.isRetryable(statusCode)) {
throw error;
}
const delay = this.getDelay(attempt);
console.log(`重试 ${attempt + 1}/${this.maxRetries},等待 ${delay.toFixed(0)}ms`);
await new Promise(r => setTimeout(r, delay));
}
}
throw lastError;
}
}
// ========== LLM 客户端 ==========
class LLMClient {
constructor({ baseUrl, apiKey }) {
this.baseUrl = baseUrl;
this.apiKey = apiKey;
this.retry = new RetryWithBackoff({ maxRetries: 3 });
}
async healthCheck() {
try {
const { statusCode } = await request(`${this.baseUrl}/v1/models`, {
method: 'GET',
headers: { 'Authorization': `Bearer ${this.apiKey}` },
dispatcher: llmAgent,
});
return statusCode === 200;
} catch {
return false;
}
}
async chat({ messages, model = 'gpt-4', stream = false }) {
return this.retry.execute(async () => {
const { statusCode, body } = await request(
`${this.baseUrl}/v1/chat/completions`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ model, messages, stream }),
dispatcher: llmAgent,
}
);
if (statusCode === 429) {
const err = new Error('Rate limited');
err.statusCode = 429;
throw err;
}
if (statusCode >= 500) {
const err = new Error(`Server error: ${statusCode}`);
err.statusCode = statusCode;
throw err;
}
const data = await body.json();
return data;
});
}
/** SSE 流式请求 */
async *chatStream({ messages, model = 'gpt-4' }) {
for (let attempt = 0; attempt <= 3; attempt++) {
try {
const { statusCode, body } = await request(
`${this.baseUrl}/v1/chat/completions`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ model, messages, stream: true }),
dispatcher: llmAgent,
// 流式使用更短的 bodyTimeout
bodyTimeout: 10_000,
}
);
if (statusCode !== 200) throw new Error(`HTTP ${statusCode}`);
for await (const chunk of body) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
yield data;
}
}
}
return;
} catch (error) {
if (attempt >= 3) throw error;
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
}
}
}
/** 关闭连接池 */
async close() {
await llmAgent.close();
}
}
// ========== 使用示例 ==========
async function main() {
const client = new LLMClient({
baseUrl: 'https://api.openai.com',
apiKey: 'sk-xxxx',
});
const healthy = await client.healthCheck();
console.log(`服务健康: ${healthy}`);
const result = await client.chat({
messages: [{ role: 'user', content: 'Hello' }],
});
console.log(result.choices[0].message.content);
}
main().catch(console.error);
HTTP/1.1 默认启用,客户端 keepalive_expiry 比服务端 timeout 短 30s
max_connections = 并发数 × 1.5,单 host 20-50,避免端口耗尽
KEEPIDLE=60s, KEEPINTVL=10s, KEEPCNT=3,防止中间件断连
Connect 5-10s / FirstToken 15s / TokenInterval 10s / Total 180s
指数退避(1s→2s→4s),随机抖动(±50%),遵循 Retry-After
每 30s HEAD /health,及时剔除不可用节点
定时扫描空闲 >60s 的连接,主动关闭,防止 Connection Reset
启动时预解析,TTL 缓存 5–10 分钟,减少解析延迟
单连接承载多并发流,消除 HTTP/1.1 头阻塞
SSE 使用独立连接和独立超时,token 间超时主动重试
with/as 上下文管理,__aexit__ 中 await client.aclose()
记录连接池使用率、重试次数、各阶段延迟,及时调整参数
| 场景 | 连接池 | KeepAlive | Read Timeout | 重试 |
|---|---|---|---|---|
| 低并发(< 10 QPS) | max=20, per_host=10 | 60s | 120s | 2 次 |
| 中并发(10–50 QPS) | max=100, per_host=30 | 90s | 180s | 3 次 |
| 高并发(50–200 QPS) | max=200, per_host=50 | 90s | 180s | 3 次 |
| 流式为主 | max=150, per_host=40 | 120s | Token 间 10s | 2 次 |
| 本地 vLLM/Ollama | max=50, per_host=50 | 300s | 300s | 1 次 |