延迟队列五种方案怎么选:从一次丢消息事故说起


做电商后端,延迟执行的需求绕不开:下单 30 分钟不付款要自动关单放库存,优惠券到期前 24 小时推提醒,发货 72 小时物流没更新要自动开工单。这些都是“现在记一笔,过一段时间再执行”的活儿。

我们这条链路最早用的是最省事的方案,后来出过一次丢消息的事故,才把几种方案认真比了一遍。这篇把过程和最后的取舍写下来,顺便把每个方案的坑标出来。

先说结论:我们最后选的是 Redis ZSET 分片轮询做主链路,MySQL 扫表做兜底。但这个结论是被业务约束逼出来的,换个约束答案就变。所以下面先把需求摊开。

我们的延迟需求长什么样

把几个业务揉到一起看,量级大概是这样:

  • 订单超时关闭,日均 60 万笔,这是大头
  • 活动预热推送,T-2h / T-30min / T-5min 三档,峰值时队列里同时压着 500 万条
  • 优惠券到期提醒,日均 300 万条
  • 物流超时预警,72 小时这种长延迟

抽出来几条硬指标:延迟精度要 3 秒以内,消息至少投一次不能丢,单机得扛 5000 QPS,延迟跨度从 5 秒到 7 天。

还有一条容易被忽略但很关键的:要能动态取消。用户中途把款付了,那条“超时关单”的延迟任务就得撤掉,而且要快,不能等它真的触发了再去判断。这条需求后面会发现是整个选型的分水岭。

五种方案横向比一遍

市面上能用的方案不多,核心就这么几类:

方案 延迟精度 吞吐量 持久化 动态取消 运维复杂度
JDK DelayQueue μs 级 单机 ~1 万 无,纯内存 支持 remove()
Netty HashedWheelTimer 看 tickDuration 单机 ~10 万 无,纯内存 支持 cancel()
Redis ZSET 轮询 看轮询间隔,通常 1s 单片 ~3 万 RDB/AOF 支持 ZREM,O(1)
RocketMQ 延迟消息 秒级,5.x 可任意延迟 集群 ~10 万+ Broker 持久化 无原生取消 中高
MySQL 扫表 看扫描间隔,通常 5s 单库 ~3000 天然持久化 UPDATE 状态

两个纯内存方案(DelayQueue 和时间轮)精度最高,但一宕机任务全没,持久化得自己补,我们这种百万级堆积量直接排除主链路资格。剩下三个值得展开讲。

方案一:Redis ZSET 分片轮询

思路很直接。用有序集合,score 放触发时间戳,value 放任务 ID。消费线程每隔 1 秒用 ZRANGEBYSCORE 把到期的捞出来。

单个 key 顶不住的时候按任务 ID 哈希到 N 个分片,每片一个消费线程,避免热点集中在一个 key 上。

@Component
public class RedisDelayQueue {

    private static final int SHARD_COUNT = 16;
    private static final String KEY_PREFIX = "delay:queue:";

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 投递延迟任务
     */
    public void offer(String taskId, long delayMs) {
        int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT;
        double score = System.currentTimeMillis() + delayMs;
        redisTemplate.opsForZSet().add(KEY_PREFIX + shard, taskId, score);
    }

    /**
     * 取消延迟任务
     */
    public boolean cancel(String taskId) {
        int shard = Math.abs(taskId.hashCode()) % SHARD_COUNT;
        Long removed = redisTemplate.opsForZSet().remove(KEY_PREFIX + shard, taskId);
        return removed != null && removed > 0;
    }
}

消费这边有个坑,捞和删必须放一个 Lua 脚本里原子做。如果先 ZRANGEBYSCOREZREM,中间这一下,多个消费者会把同一批任务都捞走:

@Component
public class RedisDelayQueueConsumer {

    private static final String LUA_POLL = """
        local items = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1], 'LIMIT', 0, ARGV[2])
        if #items > 0 then
            redis.call('ZREM', KEYS[1], unpack(items))
        end
        return items
        """;

    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private DelayTaskHandler taskHandler;

    private final RedisScript<List> pollScript = RedisScript.of(LUA_POLL, List.class);

    @Scheduled(fixedDelay = 1000)
    public void consume() {
        for (int shard = 0; shard < 16; shard++) {
            String key = "delay:queue:" + shard;
            String now = String.valueOf(System.currentTimeMillis());
            List<String> tasks = redisTemplate.execute(pollScript,
                    List.of(key), now, "100");
            if (tasks != null) {
                for (String taskId : tasks) {
                    taskHandler.handleWithRetry(taskId);
                }
            }
        }
    }
}

好处是精度高,1 秒轮询就能压进 1 秒精度;动态取消是 ZREM,O(1) 直接撤;吞吐量也够。

缺点也实在。Redis 宕机时 RDB 会丢秒级数据,AOF everysec 也最多丢 1 秒;ZSET 单 key 超过百万元素之后 ZRANGEBYSCORE 开始变慢,得靠分片压住;value 只能放 ID,任务详情得另存一份在 Hash 或 DB 里。

方案二:RocketMQ 5.x 任意延迟消息

RocketMQ 5.x 加了 Timer 消息,支持任意延迟时间,底层是时间轮 TimerWheel 加顺序日志 TimerLog。如果你本来就有 RocketMQ,这个方案最省事:

@Component
public class RocketMQDelayProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendDelayMessage(String topic, String taskId, String body, long delayMs) {
        Message<String> message = MessageBuilder.withPayload(body)
                .setHeader("KEYS", taskId)
                .build();
        rocketMQTemplate.syncSendDelayTimeMills(topic, message, delayMs);
    }
}

@Component
@RocketMQMessageListener(
        topic = "DELAY_ORDER_CLOSE",
        consumerGroup = "GID_ORDER_CLOSE",
        maxReconsumeTimes = 5
)
public class OrderCloseConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private OrderService orderService;

    @Override
    public void onMessage(MessageExt msg) {
        String orderId = new String(msg.getBody());
        // 消费时检查订单状态,兼作"取消"的补偿
        Order order = orderService.getById(orderId);
        if (order == null || order.getStatus() != OrderStatus.UNPAID) {
            return; // 已支付或已取消,跳过
        }
        orderService.closeOrder(orderId);
    }
}

问题出在取消上。RocketMQ 没有原生的消息撤回,只能消费时校验状态:用户付款后把订单改成 PAID,延迟消息到点了发现状态不是 UNPAID 就丢掉。这招简单可靠,代价是消息照样投递、照样消费一次,只是空跑。对“超时关单”这种场景勉强能接受,但用户体验上,那条任务其实一直挂到触发那一刻才消失。

其余的优点很扎实:天然持久化,主从切换不丢,堆积能力靠磁盘,任意延迟时间。要注意 Timer 消息是 5.x 的新功能,部分云厂商支持还不全;另外 Broker 重启时要回放 TimerLog 重建时间轮,堆积越多恢复越慢。

方案三:MySQL 扫表加乐观锁

一张表存任务,后台线程定时扫到期的。这方案吞吐撑死单库 3000 QPS,扛不了主链路,但做兜底和小规模场景很合适:

CREATE TABLE delay_task (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL COMMENT '业务唯一标识',
    task_type TINYINT NOT NULL COMMENT '任务类型:1-订单关闭 2-通知推送 3-券到期提醒',
    task_body JSON COMMENT '任务参数',
    execute_at DATETIME(3) NOT NULL COMMENT '预期执行时间',
    status TINYINT NOT NULL DEFAULT 0 COMMENT '0-待执行 1-执行中 2-成功 3-失败 4-已取消',
    retry_count TINYINT NOT NULL DEFAULT 0,
    version INT NOT NULL DEFAULT 0 COMMENT '乐观锁',
    created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    updated_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
    UNIQUE KEY uk_task_id (task_id),
    KEY idx_status_execute (status, execute_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

扫描有两个细节:游标分页避开深翻页的 OFFSET,乐观锁抢任务避免多实例重复执行:

@Component
public class DbDelayTaskScanner {

    @Autowired
    private DelayTaskMapper taskMapper;
    @Autowired
    private DelayTaskHandler taskHandler;
    @Autowired
    private ThreadPoolExecutor delayTaskPool;

    @Scheduled(fixedDelay = 3000)
    public void scan() {
        long lastId = 0;
        while (true) {
            // 游标分页,避免 OFFSET 深翻页
            List<DelayTask> batch = taskMapper.selectPendingTasks(
                    lastId, LocalDateTime.now(), 200);
            if (batch.isEmpty()) break;

            for (DelayTask task : batch) {
                // 乐观锁抢任务:UPDATE SET status=1, version=version+1
                //                WHERE id=? AND status=0 AND version=?
                boolean locked = taskMapper.tryLock(task.getId(), task.getVersion());
                if (locked) {
                    delayTaskPool.execute(() -> taskHandler.handleWithRetry(task));
                }
            }
            lastId = batch.get(batch.size() - 1).getId();
        }
    }
}

天然持久化、事务保证、取消就是一条 UPDATE status=4、不依赖额外中间件,这几点让它特别适合做兜底。缺点是精度受扫描间隔限制,大表扫描压数据库,吞吐上不去,百万级任务量得分库分表才行。

最后选了什么,以及为什么

回到那条分水岭需求:动态取消要快。RocketMQ 撤不掉消息,这一条就把它从主链路候选里划掉了。最后定的是 Redis ZSET 做主链路,MySQL 扫表做兜底。

主链路选 Redis ZSET,几个数都对得上:

  • 精度,1 秒轮询就满足 3 秒以内的要求,比 MySQL 扫表强一截
  • 取消,ZREM 是 O(1),不用绕消费端校验那一圈
  • 吞吐,16 分片,每片 3 万,加起来 48 万 QPS,业务远用不到
  • 堆积,500 万任务每条 200 字节算下来 1GB 内存,Redis Cluster 轻松吃下

但 Redis 会丢秒级数据,这是它的命门,所以补一层 MySQL 兜底。投递时先把任务写进 delay_task,Redis 消费成功再把状态改成 2。另起一个兜底线程,每 30 秒扫一遍 status=0 AND execute_at < now() - 10s 的漏网任务,专治 Redis 丢消息那种极端情况:

@Service
public class HybridDelayQueueService {

    @Autowired
    private RedisDelayQueue redisDelayQueue;
    @Autowired
    private DelayTaskMapper delayTaskMapper;
    @Autowired
    private TransactionTemplate transactionTemplate;

    public void submit(DelayTask task) {
        // 1. 先写 DB,事务内,保证持久化
        transactionTemplate.executeWithoutResult(status -> {
            delayTaskMapper.insert(task);
        });
        // 2. 再投 Redis,失败也不慌,兜底扫描会补
        try {
            redisDelayQueue.offer(task.getTaskId(), task.getDelayMs());
        } catch (Exception e) {
            log.warn("Redis投递失败,依赖DB兜底: taskId={}", task.getTaskId(), e);
        }
    }

    public void cancel(String taskId) {
        // 双写取消:Redis + DB
        redisDelayQueue.cancel(taskId);
        delayTaskMapper.updateStatus(taskId, TaskStatus.CANCELLED);
    }
}

这套组合每一处选择都在拿一样东西换另一样,列清楚比含糊带过强:

维度 选择 牺牲了什么
主链路存储 Redis ZSET 分片 内存成本 ~1GB,极端宕机丢秒级数据
精度 1s 轮询间隔 轮询空转 CPU,16 线程每秒各一次,可接受
持久化保障 MySQL 兜底表 多一次 DB 写入,异步,不压主链路 RT
动态取消 Redis ZREM + DB UPDATE 双写 极端情况 Redis 已消费但 DB 未更新,靠幂等兜底
消费幂等 唯一键 + 状态机 极低概率重复执行,状态检查兜住
高可用 Redis Cluster 自动故障转移 主从切换那 ~15s 可能短暂不可用,兜底扫描补

几个真实踩过的坑

丢消息那次事故的复盘。 当时三个原因叠在一起。一是 AOF 关着,RDB 间隔 5 分钟,Redis 一宕机丢了 5 分钟内全部延迟任务,后来开了 AOF everysec。二是上面提过的,ZRANGEBYSCOREZREM 分两次调用不原子,多消费者重复拉,自以为消费了实际丢了,合进一个 Lua 脚本解决。三是 ZSET value 当时存的是序列化对象不是 ID,反序列化失败后消息被 ZREM 删了却没执行,改成 value 只存 ID、详情查 DB。

ZSET 单 key 塞 500 万条会怎样。 ZRANGEBYSCORE 是 O(log(N)+M),N 等于 500 万时 log(N) 约等于 23,不是瓶颈。真正的麻烦是大 key 引起的集群倾斜和主从同步延迟,还有大 key 做 RDB 持久化会阻塞主线程。所以必须分片到 16 到 64 片,单片压在 50 万以下,顺手开 lazyfree-lazy-expirelazyfree-lazy-server-del

长延迟和短延迟混在一个 ZSET 里有没有问题。 没本质问题,ZRANGEBYSCORE 按 score 范围查,跟最大延迟无关。但 7 天这种长延迟任务量一大,ZSET 会持续膨胀。优化是按天分桶,key 写成 delay:queue:{shard}:{yyyyMMdd},只轮询当天和前一天,过期的 key 自动清。

如果某个分片主从一起挂了。 那片 ZSET 暂时读不了,这时候就靠 MySQL 兜底线程兜:它会扫到 status=0 AND execute_at < now() - 10s 的任务自动补执行。算一下最坏延迟,兜底间隔 30 秒加 10 秒容忍窗口,大约 43 秒,业务能接受。Redis 分片恢复重建后,已经执行过的任务靠幂等检查跳过。

换个约束,答案就变

这套方案是被“要支持快速取消”逼出来的。约束一变,选型跟着变,记几条经验:

如果业务压根不需要动态取消,RocketMQ 5.x 任意延迟消息是更省心的选择,持久化和高可用都不用自己操心。

如果精度要压到 100 毫秒以内,比如金融撮合,Redis 1 秒轮询不够用,得上 Netty 时间轮做本地精确计时,tickDuration 设 10 毫秒。但纯内存不持久化,得组合着来:任务先落 DB 加 Redis,时间轮从 Redis 加载近 1 小时的任务,宕机再从 Redis 重建。这种精度下还要盯 GC,G1 那 200 毫秒的 STW 会直接毁掉 100 毫秒精度,得换 ZGC,暂停能压到 1 毫秒以内。

如果任务量从 500 万涨到 5000 万,200 字节算下来 10GB,Redis 还扛得住但成本上来了。可以分冷热:近 2 小时到期的放 Redis,2 小时以上的放 RocketMQ 或 MySQL,一个调度线程每 10 分钟把快进 2 小时窗口的任务从冷存储搬进 Redis,把 Redis 峰值数据量压在百万级。

最后回到时间轮那个参数,Netty 的 tickDuration 默认 100 毫秒。设成 1 毫秒,每秒 1000 次 tick,空转 CPU 吃不消;设成 1 秒,精度又掉到秒级。延迟队列消费线程一般 100 到 500 毫秒比较稳。还有一点容易忘,HashedWheelTimer 是单线程的,任务里千万别阻塞,它该做的只是把活儿丢进线程池,真正的业务逻辑在池子里跑。