延迟队列五种方案怎么选:从一次丢消息事故说起
做电商后端,延迟执行的需求绕不开:下单 30 分钟不付款要自动关单放库存,优惠券到期前 24 小时推提醒,发货 72 小时物流没更新要自动开工单。这些都是“现在记一笔,过一段时间再执行”的活儿。
我们这条链路最早用的是最省事的方案,后来出过一次丢消息的事故,才把几种方案认真比了一遍。这篇把过程和最后的取舍写下来,顺便把每个方案的坑标出来。
先说结论:我们最后选的是 Redis ZSET 分片轮询做主链路,MySQL 扫表做兜底。但这个结论是被业务约束逼出来的,换个约束答案就变。所以下面先把需求摊开。
我们的延迟需求长什么样
把几个业务揉到一起看,量级大概是这样:
- 订单超时关闭,日均 60 万笔,这是大头
- 活动预热推送,T-2h / T-30min / T-5min 三档,峰值时队列里同时压着 500 万条
- 优惠券到期提醒,日均 300 万条
- 物流超时预警,72 小时这种长延迟
抽出来几条硬指标:延迟精度要 3 秒以内,消息至少投一次不能丢,单机得扛 5000 QPS,延迟跨度从 5 秒到 7 天。
还有一条容易被忽略但很关键的:要能动态取消。用户中途把款付了,那条“超时关单”的延迟任务就得撤掉,而且要快,不能等它真的触发了再去判断。这条需求后面会发现是整个选型的分水岭。
五种方案横向比一遍
市面上能用的方案不多,核心就这么几类:
| 方案 | 延迟精度 | 吞吐量 | 持久化 | 动态取消 | 运维复杂度 |
|---|---|---|---|---|---|
| JDK DelayQueue | μs 级 | 单机 ~1 万 | 无,纯内存 | 支持 remove() | 低 |
| Netty HashedWheelTimer | 看 tickDuration | 单机 ~10 万 | 无,纯内存 | 支持 cancel() | 低 |
| Redis ZSET 轮询 | 看轮询间隔,通常 1s | 单片 ~3 万 | RDB/AOF | 支持 ZREM,O(1) | 中 |
| RocketMQ 延迟消息 | 秒级,5.x 可任意延迟 | 集群 ~10 万+ | Broker 持久化 | 无原生取消 | 中高 |
| MySQL 扫表 | 看扫描间隔,通常 5s | 单库 ~3000 | 天然持久化 | UPDATE 状态 | 低 |
两个纯内存方案(DelayQueue 和时间轮)精度最高,但一宕机任务全没,持久化得自己补,我们这种百万级堆积量直接排除主链路资格。剩下三个值得展开讲。
方案一:Redis ZSET 分片轮询
思路很直接。用有序集合,score 放触发时间戳,value 放任务 ID。消费线程每隔 1 秒用 ZRANGEBYSCORE 把到期的捞出来。
单个 key 顶不住的时候按任务 ID 哈希到 N 个分片,每片一个消费线程,避免热点集中在一个 key 上。
@Component
public class RedisDelayQueue {
private static final int SHARD_COUNT = 16;
private static final String KEY_PREFIX = "delay:queue:";
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 投递延迟任务
*/
public void offer(String taskId, long delayMs) {
int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT;
double score = System.currentTimeMillis() + delayMs;
redisTemplate.opsForZSet().add(KEY_PREFIX + shard, taskId, score);
}
/**
* 取消延迟任务
*/
public boolean cancel(String taskId) {
int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT;
Long removed = redisTemplate.opsForZSet().remove(KEY_PREFIX + shard, taskId);
return removed != null && removed > 0;
}
}
消费这边有个坑,捞和删必须放一个 Lua 脚本里原子做。如果先 ZRANGEBYSCORE 再 ZREM,中间这一下,多个消费者会把同一批任务都捞走:
@Component
public class RedisDelayQueueConsumer {
private static final String LUA_POLL = """
local items = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1], 'LIMIT', 0, ARGV[2])
if #items > 0 then
redis.call('ZREM', KEYS[1], unpack(items))
end
return items
""";
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private DelayTaskHandler taskHandler;
private final RedisScript<List> pollScript = RedisScript.of(LUA_POLL, List.class);
@Scheduled(fixedDelay = 1000)
public void consume() {
for (int shard = 0; shard < 16; shard++) {
String key = "delay:queue:" + shard;
String now = String.valueOf(System.currentTimeMillis());
List<String> tasks = redisTemplate.execute(pollScript,
List.of(key), now, "100");
if (tasks != null) {
for (String taskId : tasks) {
taskHandler.handleWithRetry(taskId);
}
}
}
}
}
好处是精度高,1 秒轮询就能压进 1 秒精度;动态取消是 ZREM,O(1) 直接撤;吞吐量也够。
缺点也实在。Redis 宕机时 RDB 会丢秒级数据,AOF everysec 也最多丢 1 秒;ZSET 单 key 超过百万元素之后 ZRANGEBYSCORE 开始变慢,得靠分片压住;value 只能放 ID,任务详情得另存一份在 Hash 或 DB 里。
方案二:RocketMQ 5.x 任意延迟消息
RocketMQ 5.x 加了 Timer 消息,支持任意延迟时间,底层是时间轮 TimerWheel 加顺序日志 TimerLog。如果你本来就有 RocketMQ,这个方案最省事:
@Component
public class RocketMQDelayProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMessage(String topic, String taskId, String body, long delayMs) {
Message<String> message = MessageBuilder.withPayload(body)
.setHeader("KEYS", taskId)
.build();
rocketMQTemplate.syncSendDelayTimeMills(topic, message, delayMs);
}
}
@Component
@RocketMQMessageListener(
topic = "DELAY_ORDER_CLOSE",
consumerGroup = "GID_ORDER_CLOSE",
maxReconsumeTimes = 5
)
public class OrderCloseConsumer implements RocketMQListener<MessageExt> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(MessageExt msg) {
String orderId = new String(msg.getBody());
// 消费时检查订单状态,兼作"取消"的补偿
Order order = orderService.getById(orderId);
if (order == null || order.getStatus() != OrderStatus.UNPAID) {
return; // 已支付或已取消,跳过
}
orderService.closeOrder(orderId);
}
}
问题出在取消上。RocketMQ 没有原生的消息撤回,只能消费时校验状态:用户付款后把订单改成 PAID,延迟消息到点了发现状态不是 UNPAID 就丢掉。这招简单可靠,代价是消息照样投递、照样消费一次,只是空跑。对“超时关单”这种场景勉强能接受,但用户体验上,那条任务其实一直挂到触发那一刻才消失。
其余的优点很扎实:天然持久化,主从切换不丢,堆积能力靠磁盘,任意延迟时间。要注意 Timer 消息是 5.x 的新功能,部分云厂商支持还不全;另外 Broker 重启时要回放 TimerLog 重建时间轮,堆积越多恢复越慢。
方案三:MySQL 扫表加乐观锁
一张表存任务,后台线程定时扫到期的。这方案吞吐撑死单库 3000 QPS,扛不了主链路,但做兜底和小规模场景很合适:
CREATE TABLE delay_task (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL COMMENT '业务唯一标识',
task_type TINYINT NOT NULL COMMENT '任务类型:1-订单关闭 2-通知推送 3-券到期提醒',
task_body JSON COMMENT '任务参数',
execute_at DATETIME(3) NOT NULL COMMENT '预期执行时间',
status TINYINT NOT NULL DEFAULT 0 COMMENT '0-待执行 1-执行中 2-成功 3-失败 4-已取消',
retry_count TINYINT NOT NULL DEFAULT 0,
version INT NOT NULL DEFAULT 0 COMMENT '乐观锁',
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
updated_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
UNIQUE KEY uk_task_id (task_id),
KEY idx_status_execute (status, execute_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
扫描有两个细节:游标分页避开深翻页的 OFFSET,乐观锁抢任务避免多实例重复执行:
@Component
public class DbDelayTaskScanner {
@Autowired
private DelayTaskMapper taskMapper;
@Autowired
private DelayTaskHandler taskHandler;
@Autowired
private ThreadPoolExecutor delayTaskPool;
@Scheduled(fixedDelay = 3000)
public void scan() {
long lastId = 0;
while (true) {
// 游标分页,避免 OFFSET 深翻页
List<DelayTask> batch = taskMapper.selectPendingTasks(
lastId, LocalDateTime.now(), 200);
if (batch.isEmpty()) break;
for (DelayTask task : batch) {
// 乐观锁抢任务:UPDATE SET status=1, version=version+1
// WHERE id=? AND status=0 AND version=?
boolean locked = taskMapper.tryLock(task.getId(), task.getVersion());
if (locked) {
delayTaskPool.execute(() -> taskHandler.handleWithRetry(task));
}
}
lastId = batch.get(batch.size() - 1).getId();
}
}
}
天然持久化、事务保证、取消就是一条 UPDATE status=4、不依赖额外中间件,这几点让它特别适合做兜底。缺点是精度受扫描间隔限制,大表扫描压数据库,吞吐上不去,百万级任务量得分库分表才行。
最后选了什么,以及为什么
回到那条分水岭需求:动态取消要快。RocketMQ 撤不掉消息,这一条就把它从主链路候选里划掉了。最后定的是 Redis ZSET 做主链路,MySQL 扫表做兜底。
主链路选 Redis ZSET,几个数都对得上:
- 精度,1 秒轮询就满足 3 秒以内的要求,比 MySQL 扫表强一截
- 取消,
ZREM是 O(1),不用绕消费端校验那一圈 - 吞吐,16 分片,每片 3 万,加起来 48 万 QPS,业务远用不到
- 堆积,500 万任务每条 200 字节算下来 1GB 内存,Redis Cluster 轻松吃下
但 Redis 会丢秒级数据,这是它的命门,所以补一层 MySQL 兜底。投递时先把任务写进 delay_task,Redis 消费成功再把状态改成 2。另起一个兜底线程,每 30 秒扫一遍 status=0 AND execute_at < now() - 10s 的漏网任务,专治 Redis 丢消息那种极端情况:
@Service
public class HybridDelayQueueService {
@Autowired
private RedisDelayQueue redisDelayQueue;
@Autowired
private DelayTaskMapper delayTaskMapper;
@Autowired
private TransactionTemplate transactionTemplate;
public void submit(DelayTask task) {
// 1. 先写 DB,事务内,保证持久化
transactionTemplate.executeWithoutResult(status -> {
delayTaskMapper.insert(task);
});
// 2. 再投 Redis,失败也不慌,兜底扫描会补
try {
redisDelayQueue.offer(task.getTaskId(), task.getDelayMs());
} catch (Exception e) {
log.warn("Redis投递失败,依赖DB兜底: taskId={}", task.getTaskId(), e);
}
}
public void cancel(String taskId) {
// 双写取消:Redis + DB
redisDelayQueue.cancel(taskId);
delayTaskMapper.updateStatus(taskId, TaskStatus.CANCELLED);
}
}
这套组合每一处选择都在拿一样东西换另一样,列清楚比含糊带过强:
| 维度 | 选择 | 牺牲了什么 |
|---|---|---|
| 主链路存储 | Redis ZSET 分片 | 内存成本 ~1GB,极端宕机丢秒级数据 |
| 精度 | 1s 轮询间隔 | 轮询空转 CPU,16 线程每秒各一次,可接受 |
| 持久化保障 | MySQL 兜底表 | 多一次 DB 写入,异步,不压主链路 RT |
| 动态取消 | Redis ZREM + DB UPDATE 双写 | 极端情况 Redis 已消费但 DB 未更新,靠幂等兜底 |
| 消费幂等 | 唯一键 + 状态机 | 极低概率重复执行,状态检查兜住 |
| 高可用 | Redis Cluster 自动故障转移 | 主从切换那 ~15s 可能短暂不可用,兜底扫描补 |
几个真实踩过的坑
丢消息那次事故的复盘。 当时三个原因叠在一起。一是 AOF 关着,RDB 间隔 5 分钟,Redis 一宕机丢了 5 分钟内全部延迟任务,后来开了 AOF everysec。二是上面提过的,ZRANGEBYSCORE 和 ZREM 分两次调用不原子,多消费者重复拉,自以为消费了实际丢了,合进一个 Lua 脚本解决。三是 ZSET value 当时存的是序列化对象不是 ID,反序列化失败后消息被 ZREM 删了却没执行,改成 value 只存 ID、详情查 DB。
ZSET 单 key 塞 500 万条会怎样。 ZRANGEBYSCORE 是 O(log(N)+M),N 等于 500 万时 log(N) 约等于 23,不是瓶颈。真正的麻烦是大 key 引起的集群倾斜和主从同步延迟,还有大 key 做 RDB 持久化会阻塞主线程。所以必须分片到 16 到 64 片,单片压在 50 万以下,顺手开 lazyfree-lazy-expire 和 lazyfree-lazy-server-del。
长延迟和短延迟混在一个 ZSET 里有没有问题。 没本质问题,ZRANGEBYSCORE 按 score 范围查,跟最大延迟无关。但 7 天这种长延迟任务量一大,ZSET 会持续膨胀。优化是按天分桶,key 写成 delay:queue:{shard}:{yyyyMMdd},只轮询当天和前一天,过期的 key 自动清。
如果某个分片主从一起挂了。 那片 ZSET 暂时读不了,这时候就靠 MySQL 兜底线程兜:它会扫到 status=0 AND execute_at < now() - 10s 的任务自动补执行。算一下最坏延迟,兜底间隔 30 秒加 10 秒容忍窗口,大约 43 秒,业务能接受。Redis 分片恢复重建后,已经执行过的任务靠幂等检查跳过。
换个约束,答案就变
这套方案是被“要支持快速取消”逼出来的。约束一变,选型跟着变,记几条经验:
如果业务压根不需要动态取消,RocketMQ 5.x 任意延迟消息是更省心的选择,持久化和高可用都不用自己操心。
如果精度要压到 100 毫秒以内,比如金融撮合,Redis 1 秒轮询不够用,得上 Netty 时间轮做本地精确计时,tickDuration 设 10 毫秒。但纯内存不持久化,得组合着来:任务先落 DB 加 Redis,时间轮从 Redis 加载近 1 小时的任务,宕机再从 Redis 重建。这种精度下还要盯 GC,G1 那 200 毫秒的 STW 会直接毁掉 100 毫秒精度,得换 ZGC,暂停能压到 1 毫秒以内。
如果任务量从 500 万涨到 5000 万,200 字节算下来 10GB,Redis 还扛得住但成本上来了。可以分冷热:近 2 小时到期的放 Redis,2 小时以上的放 RocketMQ 或 MySQL,一个调度线程每 10 分钟把快进 2 小时窗口的任务从冷存储搬进 Redis,把 Redis 峰值数据量压在百万级。
最后回到时间轮那个参数,Netty 的 tickDuration 默认 100 毫秒。设成 1 毫秒,每秒 1000 次 tick,空转 CPU 吃不消;设成 1 秒,精度又掉到秒级。延迟队列消费线程一般 100 到 500 毫秒比较稳。还有一点容易忘,HashedWheelTimer 是单线程的,任务里千万别阻塞,它该做的只是把活儿丢进线程池,真正的业务逻辑在池子里跑。