6 种实现方案深度对比,从定时轮询到分布式延迟队列,原理、代码、选型一站讲清楚
用户在电商/App 下单后,系统需要保留库存、锁定优惠,等待用户完成支付。但如果用户一直不付款,订单不能永远挂着,否则会造成:
后台定时任务每隔固定时间(如每秒)扫描数据库,查询 status = '待支付' AND create_time < NOW() - INTERVAL N MINUTE 的订单,批量关闭。
// 伪代码 - Spring Boot 示例
@Scheduled(cron = "0/1 * * * * ?") // 每秒执行一次
public void closeExpiredOrders() {
List<Order> expired = orderMapper.selectExpiredPending();
for (Order order : expired) {
boolean closed = orderService.closeOrder(order.getId());
if (closed) {
stockService.releaseStock(order.getSkuIds());
couponService.releaseCoupon(order.getCouponId());
}
}
}
Java 自带的 DelayQueue 是一个无界阻塞队列,元素必须实现 Delayed 接口。消费者线程 take() 会阻塞,直到队头元素到期。将订单包装为延迟元素,到期时自动出队触发关闭。
public class OrderDelayTask implements Delayed {
private final String orderId;
private final long expireTimeNanos;
public OrderDelayTask(String orderId, long delayMinutes) {
this.orderId = orderId;
this.expireTimeNanos =
System.nanoTime() + TimeUnit.MINUTES.toNanos(delayMinutes);
}
public long getDelay(TimeUnit unit) {
return unit.convert(expireTimeNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed other) { // 按到期时间排序
return Long.compare(this.expireTimeNanos,
((OrderDelayTask) other).expireTimeNanos);
}
}
// 消费者线程
DelayQueue<OrderDelayTask> queue = new DelayQueue<>();
// 下单时入队
queue.put(new OrderDelayTask(orderId, 30));
// 消费线程循环取出
while (true) {
OrderDelayTask task = queue.take(); // 阻塞直到到期
orderService.closeIfPending(task.orderId);
}
下单时向 Redis 写入一个带 TTL 的 Key(如 order:expire:{orderId}),同时注册 KeyExpiredEvent 监听器。Key 到期时 Redis 自动触发回调,在回调中执行订单关闭逻辑。
// 1. 开启 Redis 过期通知(redis.conf)
notify-keyspace-events Ex
// 2. 下单时写入带过期时间的 Key
redis.setex("order:expire:" + orderId, 1800, orderId); // 30 分钟
// 3. Spring Boot 监听过期事件
@Component
public class RedisKeyExpirationListener {
@EventListener
public void onKeyExpired(RedisKeyExpiredEvent<String> event) {
String key = event.getKeyspace();
if (key.startsWith("order:expire:")) {
String orderId = key.substring("order:expire:".length());
orderService.closeIfPending(orderId);
}
}
}
利用 RabbitMQ 的死信交换机(DLX) + 消息 TTL 实现延迟效果。消息发送到「延迟队列」,设置 TTL,到期后自动转发到「死信队列」,消费者从死信队列消费并执行关闭逻辑。或者使用 rabbitmq_delayed_message_exchange 插件(更简洁)。
// 方式 A:DLX + TTL 方式
// 1. 声明死信交换机和死信队列(实际消费)
declareDeadExchangeAndQueue();
// 2. 声明延迟队列(绑定 DLX)
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "order.close");
channel.queueDeclare("order.delay.queue", true, false, false, args);
// 3. 下单时发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("1800000") // TTL = 30 分钟(毫秒)
.build();
channel.basicPublish("", "order.delay.queue", props,
orderJson.getBytes());
// 4. 消费死信队列,执行关闭
channel.basicConsume("order.close.queue", true,
(tag, msg) -> orderService.closeIfPending(msg), consumerTag -> {});
RocketMQ 原生支持延迟消息(scheduleLevel),内置 18 个延迟级别(1s/5s/10s/30s ... 2h)。消息发送时指定延迟级别,Broker 内部通过 TimerWheel(时间轮) 实现精确延迟投递。
// RocketMQ 延迟消息 - 开箱即用
Message msg = new Message("OrderTopic", "OrderCloseTag",
orderId, "order close delay msg".getBytes());
// 设置延迟级别:level 14 = 10 分钟,level 16 = 30 分钟
msg.setDelayTimeLevel(16); // 30 分钟
producer.send(msg);
// 消费端
consumer.subscribe("OrderTopic", "OrderCloseTag");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
orderService.closeIfPending(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
时间轮是一种高效的延迟任务调度数据结构。类似钟表,一个环形数组,每个槽位代表一个时间间隔。指针按固定频率转动,转到哪个槽位就执行该槽位上的所有任务。分层时间轮(Hierarchical Timer Wheel)可以支持长时间延迟。
// 基于 Netty HashedWheelTimer 的使用示例
HashedWheelTimer timer = new HashedWheelTimer(
1, // tickDuration = 1 秒
TimeUnit.SECONDS,
512 // ticksPerWheel = 512 个槽位
);
// 下单时注册延迟任务
timer.newTimeout(timeout -> {
orderService.closeIfPending(orderId);
// 幂等检查:订单已支付则跳过
}, 30, TimeUnit.MINUTES);
| 维度 | 数据库轮询 | DelayQueue | Redis 过期 | RabbitMQ 延迟 | RocketMQ 延迟 | 时间轮 |
|---|---|---|---|---|---|---|
| 可靠性 | ✓✓ 高 | ✗ 低(内存) | ✗ 低(可能丢) | ✓✓ 高 | ✓✓ 高 | ✗ 低(内存) |
| 延迟精度 | ★★ 秒~分钟 | ✓✓✓ 毫秒级 | ✓✓ 秒级 | ✓✓ 秒级 | ✓✓ 秒级 | ✓✓✓ 毫秒级 |
| 吞吐量 | ✗ 低 | ★★ 中 | ★★ 中 | ✓✓ 高 | ✓✓✓ 很高 | ✓✓✓ 极高 |
| 分布式支持 | ✗ 不支持 | ✗ 不支持 | ★ 部分 | ✓✓✓ 天然支持 | ✓✓✓ 天然支持 | ✗ 不支持 |
| 实现复杂度 | ✓ 低 | ✓ 低 | ✓ 低 | ★★ 中 | ✓ 低 | ★★★ 高 |
| 中间件依赖 | 无 | 无 | Redis | RabbitMQ | RocketMQ | 无/Netty |
| 适用规模 | 小 | 小 | 中小 | 中大 | 大 | 中大(单机) |
超时关闭必须是幂等操作——同一个订单被多次触发关闭,结果应该一致。
// 使用 CAS 更新保证幂等
public boolean closeOrder(String orderId) {
// UPDATE orders SET status='CLOSED'
// WHERE id = ? AND status = 'PENDING'
int affected = orderMapper.casCloseOrder(orderId, "PENDING");
return affected > 0; // 0 = 已关闭或已支付,跳过
}
即使有 CAS,在高并发场景下仍然建议加分布式锁:
String lockKey = "order:close:lock:" + orderId;
try {
boolean locked = redisLock.tryLock(lockKey, 10, TimeUnit.SECONDS);
if (locked) {
orderService.closeOrder(orderId);
}
} finally {
redisLock.unlock(lockKey);
}
完整的订单状态流转:
开启 manual ack,消费失败不 ACK,消息重回队列或进入死信队列,配合告警。
通过 order_id + 去重表 或 Redis SETNX 保证只处理一次。幂等接口设计。
分布式场景下各机器时钟可能不一致。建议统一使用 NTP 同步,或以 Redis/MQ 服务器时间为准。
关闭订单时确保库存回滚可靠。使用与扣库存相同的事务或可靠消息,防止库存不一致。
中小项目用 RocketMQ 延迟消息(最省心),大厂复杂系统用 RabbitMQ DLX + 兜底轮询(最灵活),小项目直接 数据库轮询(够用就行)。
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 个人项目 / MVP | 数据库定时轮询 | 最简单,够用,零依赖 |
| 中小型业务系统 | RocketMQ 延迟消息 | API 简洁,原生支持,可靠性高 |
| 已有 RabbitMQ 的系统 | RabbitMQ 延迟队列 | 复用现有基础设施,DLX 成熟 |
| 超大规模(电商/金融) | RocketMQ + DB 轮询兜底 | 主方案高性能 + 兜底保安全 |
| 低延迟要求(毫秒级) | 时间轮(如 Netty) | 单机极致性能,O(1) 调度 |
| 无需引入 MQ | DelayQueue + Redis 兜底 | 轻量方案,适合简单场景 |