Redis 做异步消息队列

从 List 到 Stream,深入理解 Redis 消息队列的实现方式、优劣势,以及何时该用、何时不该用

List Queue Pub/Sub Stream 消费者组 可靠性投递

🧠 为什么用 Redis 做队列?

Redis 本身是一个内存数据结构服务器,但它的 List、Pub/Sub、Stream 等数据结构天然适合构建消息队列。

消息队列的本质是「生产者写入 → 中间存储 → 消费者读取」。Redis 恰好提供了高性能的内存存储和原子性操作,可以非常自然地充当这个「中间存储」角色。 相比专门的 MQ(RabbitMQ / Kafka / RocketMQ):

  • 部署成本极低 — 很多项目已经在用 Redis,无需额外基础设施
  • 延迟极低 — 纯内存操作,微秒级响应
  • 使用简单 — 几条命令就能搭起来
  • 适合中小规模场景 — 日处理百万级消息绰绰有余

📋 三种实现方式一览

Redis 提供了三种主要的队列实现模式,适用场景和可靠性各不相同。

实现方式 核心命令 消息可靠性 适用场景 Redis 版本
List 队列 LPUSH / RPUSH + BRPOP / BLPOP 中等 简单任务队列、延迟低、规模小 所有版本
Pub/Sub PUBLISH / SUBSCRIBE / PSUBSCRIBE 低(无持久化) 实时通知、聊天、广播 所有版本
Stream XADD / XREAD / XREADGROUP / XACK 生产级消息队列、需要 ACK 和消费者组 ≥ 5.0

📋 方式一:List 队列(最经典)

利用 Redis List 的「头进尾出」特性实现 FIFO 队列,简单高效。

Producer
LPUSH
Redis List
queue:tasks
Consumer
BRPOP

✏️ 生产者 — 发布任务

            Python
import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

# 将任务序列化为 JSON 后 LPUSH 入队
task = {"type": "send_email", "to": "user@example.com"}
r.lpush("queue:tasks", json.dumps(task))

# 也可以一次推送多条
r.lpush("queue:tasks",
    json.dumps({"type": "resize_image", "path": "/a.jpg"}),
    json.dumps({"type": "gen_report",  "id": "123"}),
)
        

📥 消费者 — 阻塞式消费(推荐)

            Python
import redis, json

r = redis.Redis(host='localhost', port=6379, db=0)

while True:
    # BRPOP 阻塞等待,timeout=0 表示无限等待
    # 返回 (queue_name, task_data) 元组
    result = r.brpop("queue:tasks", timeout=5)
    if result is None:
        # 超时,可以做一些心跳检测等
        continue

    _, task_raw = result
    task = json.loads(task_raw)
    try:
        process_task(task)       # 实际业务处理
    except Exception as e:
        # ⚠️ 这里就是 List 队列的最大弱点:
        #    消息已经被弹出,崩溃后无法重试
        log_error(task, e)
        

🔒 可靠队列变体 — RPOPLPUSH 备份模式

普通 List 队列有一个致命问题:消费者在 BRPOP 弹出消息后、处理完成前如果崩溃,消息就永久丢失了。 解决方法是使用 RPOPLPUSH(或 Redis 6.2+ 的 LMOVE)将消息先移动到「处理中」备份队列:

            Python
# 原子性地:从 queue:tasks 弹出 → 同时压入 queue:tasks:processing
# Redis 6.2+ 推荐用 LMOVE
task_raw = r.rpoplpush("queue:tasks", "queue:tasks:processing")

# 处理任务...
try:
    task = json.loads(task_raw)
    process_task(task)
    # 处理成功,从 processing 队列删除
    r.lrem("queue:tasks:processing", 1, task_raw)
except Exception:
    # 处理失败,消息仍在 processing 队列
    # 可以用一个监控进程定期扫描 processing 队列做超时重试
        

📡 方式二:Pub/Sub(发布订阅)

一对多广播模式,消息即发即忘,适合实时推送但不适合可靠任务队列。

Publisher
PUBLISH
Redis
Channel
Subscriber A
+
Subscriber B

✏️ 发布 & 订阅

            Python
# ----- 发布端 -----
r = redis.Redis()
r.publish("channel:order", "订单 #456 已支付")

# ----- 订阅端 -----
pubsub = r.pubsub()
pubsub.subscribe("channel:order")

for msg in pubsub.listen():
    if msg["type"] == "message":
        print("收到:", msg["data"])  # → 订单 #456 已支付
        
⚠️ Pub/Sub 的三大限制:
① 消息不做持久化 — 如果订阅者不在线,消息直接丢弃;
② 订阅者只能收到订阅之后发布的消息,无法回溯历史;
③ 没有 ACK 机制 — 发布者无法确认消息是否被成功消费。

🌊 方式三:Stream(Redis 5.0+,生产级)

Redis Stream 是专为消息队列设计的数据结构,支持消费者组、ACK、消息持久化和回溯,是 Redis 做队列的最佳选择。

Producer
XADD
Stream
mystream
Group: workers
Consumer A / B / C

✏️ 生产者 — XADD 写入消息

            Python
# XADD mystream * field1 value1 field2 value2
# * 表示由 Redis 自动生成消息 ID(时间戳-序号)
msg_id = r.xadd("mystream", {
    "type":  "send_sms",
    "phone": "13800138000",
    "body":  "您的验证码是 123456",
})
print("消息 ID:", msg_id)  # 例: 1685352000000-0

# 限制 Stream 最大长度(防止内存无限增长)
r.xadd("mystream", {"task": "..."}, maxlen=10000)
        

📥 消费者组 — XREADGROUP + XACK

            Python
# 1. 创建消费者组(只需执行一次)
try:
    r.xgroup_create("mystream", "workers", id="0", mkstream=True)
except redis.ResponseError:
    pass  # 消费者组已存在

# 2. 消费者循环读取消息
while True:
    # 从消费者组读取,block=5000ms,count=1 一次取一条
    msgs = r.xreadgroup(
        "workers",            # 消费者组名
        "consumer-1",          # 当前消费者名
        {"mystream": ">"},       # ">" 表示只读新消息
        count=1,
        block=5000,
    )

    for stream_name, entries in msgs:
        for msg_id, fields in entries:
            try:
                process_task(fields)          # 业务处理
                r.xack("mystream", "workers", msg_id)  # ✅ ACK
            except Exception:
                # 不 ACK → 消息保持在 PEL(待处理列表)
                # → 可被其他消费者认领 (XCLAIM) 或本消费者重试
        

🔧 Stream 高级特性

特性命令说明
消息回溯 XREAD mystream 0 从 Stream 第一条消息开始重放,类似 Kafka 的 offset 重置
死信处理 XPENDING + XCLAIM 查看 PEL(待确认列表)中的超时消息,并转移给其他消费者
自动裁剪 XADD ... MAXLEN ~ 10000 ~ 表示近似裁剪,Redis 会尽量接近 10000 条而非精确裁剪,性能更好
消费者组监控 XINFO GROUPS mystream 查看消费者组状态、未处理消息数、消费者列表
Redis Stream — mystream M1 M2 M3 M4 M5 → 新消息 消费者组 — workers Consumer-1 Consumer-2 Consumer-3 PEL 待确认 超时后可 XCLAIM

⚖️ 优势 vs 劣势

全面对比 Redis 做异步队列的优缺点,帮助你做出技术选型判断。

✅ 优势

  • 极低延迟:纯内存操作,消息投递延迟通常在微秒到毫秒级别
  • 超高吞吐:单机可达 10 万+ QPS,轻松应对高并发场景
  • 零部署成本:大多数项目已经依赖 Redis,无需引入新的中间件
  • 学习成本低:命令简洁直观,团队上手难度远低于 Kafka / RocketMQ
  • 多种模式灵活选择:简单场景用 List,广播用 Pub/Sub,生产级用 Stream
  • Stream 支持消费者组:实现 Kafka 风格的消息分区和负载均衡消费
  • 消息可回溯:Stream 支持从任意位置重放,Pub/Sub 不支持但 List 可以
  • 数据持久化可选:RDB + AOF 可保证重启后消息不丢(但性能会下降)

❌ 劣势

  • 消息可靠性不如专用 MQ:默认异步复制,主节点宕机可能丢消息
  • 内存成本高:所有消息存放在内存中,不适合 TB 级消息堆积场景
  • 无原生消息路由:不支持复杂的 topic 路由、tag 过滤(需自行实现)
  • 无延迟消息:没有 RocketMQ 那样的定时/延时消息能力(需配合 Sorted Set)
  • Pub/Sub 不持久化:消息发出后消费者不在线即丢失,也不支持回溯
  • 管理能力弱:缺少消息追踪、死信队列、监控面板等运维能力
  • 扩展性有限:集群模式下的 Pub/Sub 消息会广播到所有节点,造成带宽浪费
  • Stream 不支持分区:不像 Kafka 的 Partition,Stream 是单一日志,无法并行写入

🆚 Redis 队列 vs 专用消息队列

面对 Kafka、RabbitMQ、RocketMQ 等专业 MQ,Redis 该如何取舍?

维度 Redis (Stream) RabbitMQ Kafka RocketMQ
吞吐量 10w+ QPS 1~5w QPS 百万级 QPS 10w+ QPS
延迟 微秒级 亚毫秒 毫秒级 亚毫秒
消息可靠性 中等 极高 极高
消息堆积能力 受限于内存 中等(内存+磁盘) 极强(磁盘) 极强(磁盘)
延迟消息 需自行实现 原生支持 不直接支持 原生支持
运维复杂度 中等 较高 较高
多语言客户端 丰富 丰富 丰富 以 Java 为主
消息回溯 Stream 支持 不支持 按 offset 回溯 按时间/offset
适合场景 中小规模、已有 Redis 的项目、实时性要求高 业务解耦、RPC 调用、复杂路由 大数据流、日志收集、事件溯源 电商交易、金融场景、事务消息

🌳 选型决策指南

根据你的业务场景,快速决定是否应该用 Redis 做队列。

需要消息队列吗? ✅ 是 已有 Redis? ❌ 否 直接用 RabbitMQ / Kafka ✅ 已有 消息可靠性要求? 🔄 计划中 看需求规模再定 低要求 用 Redis List 即可 高要求 用 Redis Stream 🟣 关键判断问题 日均消息量 > 千万级?需要严格的消息不丢? → 如果任一答案为「是」,建议用 Kafka / RocketMQ 选择专用 MQ (Kafka / RocketMQ) 可靠性 & 扩展性优先

💡 实战最佳实践

用 Redis 做队列时,以下经验可以帮你避开常见坑。

1. 统一使用 Stream(Redis ≥ 5.0)

新项目直接上 Stream,老项目渐进迁移。Stream 的消费者组 + ACK 机制是 List 和 Pub/Sub 无法比拟的。

2. 设置 MAXLEN 防止内存爆炸

Stream 默认不裁剪,消息会一直堆积在内存中。生产环境必须设置 MAXLEN 或定期执行 XTRIM

r.xadd("mystream", fields, maxlen=50000)  # 保留最近 5w 条
        

3. 处理 PEL 中的僵尸消息

消费者崩溃后,已分配但未 ACK 的消息会卡在 PEL 中。需要定期扫描超时消息并 XCLAIM 给其他消费者:

# 查看 PEL 中最旧的 10 条消息
pending = r.xpending_range("mystream", "workers", "-", "+", 10)

# 把闲置超过 60 秒的消息转移给 consumer-2
for p in pending:
    if p["time_since_delivered"] > 60000:
        r.xclaim("mystream", "workers", "consumer-2", 60000, p["message_id"])
        

4. 幂等消费设计

即使有 ACK,网络超时也可能导致消息重复投递。消费者端必须实现幂等处理逻辑。

5. 监控告警

对 Stream 的关键指标设置监控:

  • XLEN mystream — 队列积压长度
  • XINFO GROUPS mystream — 消费者组的 pending 数
  • XPENDING mystream workers — 未确认消息总数

6. 消费者心跳 & 优雅退出

消费者启动时执行 XGROUP CREATECONSUMER 注册自己,关闭时发送 XGROUP DELCONSUMER。配合心跳检测实现故障自动发现。

⏱️ 附:用 Redis 实现延迟队列

Redis 没有原生的延迟消息,但可以借助 Sorted Set 轻松实现。

思路:Sorted Set = 按时间排序的任务池

            Python
# ----- 生产者:发布延迟任务 -----
import time

task = json.dumps({"type": "cancel_order", "order_id": "789"})
execute_at = time.time() + 1800  # 30 分钟后执行
r.zadd("delay:queue", {task: execute_at})

# ----- 消费者:轮询到期任务 -----
while True:
    now = time.time()
    # ZRANGEBYSCORE:取出所有 score ≤ now 的任务
    tasks = r.zrangebyscore("delay:queue", 0, now, start=0, num=10)

    for task_raw in tasks:
        # 原子性删除(防止多消费者重复处理)
        if r.zrem("delay:queue", task_raw):
            task = json.loads(task_raw)
            process_task(task)

    time.sleep(1)  # 轮询间隔
        

⚠️ 注意:这不是精确的延迟投递,轮询间隔决定了最小延迟精度。对于秒级精度以下的场景,建议使用 RocketMQ 的原生延迟消息。

📌 总结

📋

List 队列

适合简单、低可靠性要求的任务队列。配合 RPOPLPUSH 可提升可靠性。

📡

Pub/Sub

适合实时推送、聊天、通知广播。不做持久化,不要用于任务队列。

🌊

Stream

Redis 做队列的最佳方案,接近专用 MQ 的使用体验。新项目首选。

一句话总结: 如果你的项目已经在用 Redis,消息量在百万/千万级别以内,且对消息丢失有一定容忍度 → 用 Redis Stream,它足够好。 如果你需要消息零丢失、TB 级堆积、事务消息、精确的延迟投递 → 上 Kafka / RocketMQ,专业的事交给专业的工具。