从 List 到 Stream,深入理解 Redis 消息队列的实现方式、优劣势,以及何时该用、何时不该用
Redis 本身是一个内存数据结构服务器,但它的 List、Pub/Sub、Stream 等数据结构天然适合构建消息队列。
消息队列的本质是「生产者写入 → 中间存储 → 消费者读取」。Redis 恰好提供了高性能的内存存储和原子性操作,可以非常自然地充当这个「中间存储」角色。 相比专门的 MQ(RabbitMQ / Kafka / RocketMQ):
Redis 提供了三种主要的队列实现模式,适用场景和可靠性各不相同。
| 实现方式 | 核心命令 | 消息可靠性 | 适用场景 | Redis 版本 |
|---|---|---|---|---|
| List 队列 | LPUSH / RPUSH + BRPOP / BLPOP | 中等 | 简单任务队列、延迟低、规模小 | 所有版本 |
| Pub/Sub | PUBLISH / SUBSCRIBE / PSUBSCRIBE | 低(无持久化) | 实时通知、聊天、广播 | 所有版本 |
| Stream | XADD / XREAD / XREADGROUP / XACK | 高 | 生产级消息队列、需要 ACK 和消费者组 | ≥ 5.0 |
利用 Redis List 的「头进尾出」特性实现 FIFO 队列,简单高效。
Python
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
# 将任务序列化为 JSON 后 LPUSH 入队
task = {"type": "send_email", "to": "user@example.com"}
r.lpush("queue:tasks", json.dumps(task))
# 也可以一次推送多条
r.lpush("queue:tasks",
json.dumps({"type": "resize_image", "path": "/a.jpg"}),
json.dumps({"type": "gen_report", "id": "123"}),
)
Python
import redis, json
r = redis.Redis(host='localhost', port=6379, db=0)
while True:
# BRPOP 阻塞等待,timeout=0 表示无限等待
# 返回 (queue_name, task_data) 元组
result = r.brpop("queue:tasks", timeout=5)
if result is None:
# 超时,可以做一些心跳检测等
continue
_, task_raw = result
task = json.loads(task_raw)
try:
process_task(task) # 实际业务处理
except Exception as e:
# ⚠️ 这里就是 List 队列的最大弱点:
# 消息已经被弹出,崩溃后无法重试
log_error(task, e)
普通 List 队列有一个致命问题:消费者在 BRPOP 弹出消息后、处理完成前如果崩溃,消息就永久丢失了。
解决方法是使用 RPOPLPUSH(或 Redis 6.2+ 的 LMOVE)将消息先移动到「处理中」备份队列:
Python
# 原子性地:从 queue:tasks 弹出 → 同时压入 queue:tasks:processing
# Redis 6.2+ 推荐用 LMOVE
task_raw = r.rpoplpush("queue:tasks", "queue:tasks:processing")
# 处理任务...
try:
task = json.loads(task_raw)
process_task(task)
# 处理成功,从 processing 队列删除
r.lrem("queue:tasks:processing", 1, task_raw)
except Exception:
# 处理失败,消息仍在 processing 队列
# 可以用一个监控进程定期扫描 processing 队列做超时重试
一对多广播模式,消息即发即忘,适合实时推送但不适合可靠任务队列。
Python
# ----- 发布端 -----
r = redis.Redis()
r.publish("channel:order", "订单 #456 已支付")
# ----- 订阅端 -----
pubsub = r.pubsub()
pubsub.subscribe("channel:order")
for msg in pubsub.listen():
if msg["type"] == "message":
print("收到:", msg["data"]) # → 订单 #456 已支付
Redis Stream 是专为消息队列设计的数据结构,支持消费者组、ACK、消息持久化和回溯,是 Redis 做队列的最佳选择。
Python
# XADD mystream * field1 value1 field2 value2
# * 表示由 Redis 自动生成消息 ID(时间戳-序号)
msg_id = r.xadd("mystream", {
"type": "send_sms",
"phone": "13800138000",
"body": "您的验证码是 123456",
})
print("消息 ID:", msg_id) # 例: 1685352000000-0
# 限制 Stream 最大长度(防止内存无限增长)
r.xadd("mystream", {"task": "..."}, maxlen=10000)
Python
# 1. 创建消费者组(只需执行一次)
try:
r.xgroup_create("mystream", "workers", id="0", mkstream=True)
except redis.ResponseError:
pass # 消费者组已存在
# 2. 消费者循环读取消息
while True:
# 从消费者组读取,block=5000ms,count=1 一次取一条
msgs = r.xreadgroup(
"workers", # 消费者组名
"consumer-1", # 当前消费者名
{"mystream": ">"}, # ">" 表示只读新消息
count=1,
block=5000,
)
for stream_name, entries in msgs:
for msg_id, fields in entries:
try:
process_task(fields) # 业务处理
r.xack("mystream", "workers", msg_id) # ✅ ACK
except Exception:
# 不 ACK → 消息保持在 PEL(待处理列表)
# → 可被其他消费者认领 (XCLAIM) 或本消费者重试
| 特性 | 命令 | 说明 |
|---|---|---|
| 消息回溯 | XREAD mystream 0 |
从 Stream 第一条消息开始重放,类似 Kafka 的 offset 重置 |
| 死信处理 | XPENDING + XCLAIM |
查看 PEL(待确认列表)中的超时消息,并转移给其他消费者 |
| 自动裁剪 | XADD ... MAXLEN ~ 10000 |
~ 表示近似裁剪,Redis 会尽量接近 10000 条而非精确裁剪,性能更好 |
| 消费者组监控 | XINFO GROUPS mystream |
查看消费者组状态、未处理消息数、消费者列表 |
全面对比 Redis 做异步队列的优缺点,帮助你做出技术选型判断。
面对 Kafka、RabbitMQ、RocketMQ 等专业 MQ,Redis 该如何取舍?
| 维度 | Redis (Stream) | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|---|
| 吞吐量 | 10w+ QPS | 1~5w QPS | 百万级 QPS | 10w+ QPS |
| 延迟 | 微秒级 | 亚毫秒 | 毫秒级 | 亚毫秒 |
| 消息可靠性 | 中等 | 高 | 极高 | 极高 |
| 消息堆积能力 | 受限于内存 | 中等(内存+磁盘) | 极强(磁盘) | 极强(磁盘) |
| 延迟消息 | 需自行实现 | 原生支持 | 不直接支持 | 原生支持 |
| 运维复杂度 | 低 | 中等 | 较高 | 较高 |
| 多语言客户端 | 丰富 | 丰富 | 丰富 | 以 Java 为主 |
| 消息回溯 | Stream 支持 | 不支持 | 按 offset 回溯 | 按时间/offset |
| 适合场景 | 中小规模、已有 Redis 的项目、实时性要求高 | 业务解耦、RPC 调用、复杂路由 | 大数据流、日志收集、事件溯源 | 电商交易、金融场景、事务消息 |
根据你的业务场景,快速决定是否应该用 Redis 做队列。
用 Redis 做队列时,以下经验可以帮你避开常见坑。
新项目直接上 Stream,老项目渐进迁移。Stream 的消费者组 + ACK 机制是 List 和 Pub/Sub 无法比拟的。
Stream 默认不裁剪,消息会一直堆积在内存中。生产环境必须设置 MAXLEN 或定期执行 XTRIM。
r.xadd("mystream", fields, maxlen=50000) # 保留最近 5w 条
消费者崩溃后,已分配但未 ACK 的消息会卡在 PEL 中。需要定期扫描超时消息并 XCLAIM 给其他消费者:
# 查看 PEL 中最旧的 10 条消息 pending = r.xpending_range("mystream", "workers", "-", "+", 10) # 把闲置超过 60 秒的消息转移给 consumer-2 for p in pending: if p["time_since_delivered"] > 60000: r.xclaim("mystream", "workers", "consumer-2", 60000, p["message_id"])
即使有 ACK,网络超时也可能导致消息重复投递。消费者端必须实现幂等处理逻辑。
对 Stream 的关键指标设置监控:
XLEN mystream — 队列积压长度XINFO GROUPS mystream — 消费者组的 pending 数XPENDING mystream workers — 未确认消息总数消费者启动时执行 XGROUP CREATECONSUMER 注册自己,关闭时发送 XGROUP DELCONSUMER。配合心跳检测实现故障自动发现。
Redis 没有原生的延迟消息,但可以借助 Sorted Set 轻松实现。
Python
# ----- 生产者:发布延迟任务 -----
import time
task = json.dumps({"type": "cancel_order", "order_id": "789"})
execute_at = time.time() + 1800 # 30 分钟后执行
r.zadd("delay:queue", {task: execute_at})
# ----- 消费者:轮询到期任务 -----
while True:
now = time.time()
# ZRANGEBYSCORE:取出所有 score ≤ now 的任务
tasks = r.zrangebyscore("delay:queue", 0, now, start=0, num=10)
for task_raw in tasks:
# 原子性删除(防止多消费者重复处理)
if r.zrem("delay:queue", task_raw):
task = json.loads(task_raw)
process_task(task)
time.sleep(1) # 轮询间隔
⚠️ 注意:这不是精确的延迟投递,轮询间隔决定了最小延迟精度。对于秒级精度以下的场景,建议使用 RocketMQ 的原生延迟消息。
适合简单、低可靠性要求的任务队列。配合 RPOPLPUSH 可提升可靠性。
适合实时推送、聊天、通知广播。不做持久化,不要用于任务队列。
Redis 做队列的最佳方案,接近专用 MQ 的使用体验。新项目首选。