延迟队列或许大家都听过,可能也用过,一种比较常见的应用场景就是用户下单后,超过 10 分钟未支付将用户订单取消,这种其实就可以通过延迟队列来实现,这里先不考虑订单数量特别大的情况。大概流程就是用户下单后,订单服务创建订单保存后同时将这笔订单信息放入延迟队列中,设置延迟时间为 10 分钟,10 分钟后从队列中取出来判断支付状态,未支付则将这笔订单取消。
实现延迟队列的方式或许有很多种,如果用 redis 来实现一个延迟队列功能应该怎么做呢,主要是通过 redis 里面的 Zset 集合来实现,Zset 自带的 score 值排序特性就非常适合用来做时间戳排序,当消息的时间戳值小于等于当前时间戳,将该消息取出来进行消费,执行相应的业务逻辑。
实现原理大致如下:
主要分为三大块,生产者,调度器,消费者,生产者负责往 redis 里面生产消息,调度器定时从 Zset 中取出已经到了执行时间的消息并移到 List 集合中,同时消费者负责不断将 List 集合中的消息取出来进行消费。
- Producer
- 首先将消息原始内容保存到 redis 中,key,val 结构,key 是消息体的唯一标识,val 是消息体内容
- 将消息 key 根据延迟时间设置对应 score 值添加到 Zset 有序集合中,key 为消息体 topic,score 为当前时间戳加上延迟时间,member 为消息 key
- Broker
- 定时遍历所有 topic,从 Zset 有序集合中通过 zrangebyscore 命令取出每个 topic 下 score 值小于等于当前时间戳的消息 key
- 将取出来的消息 key push 放入已经就绪可消费的消息 List 集合中
- Consumer
- 定时遍历所有消息就绪的 topic,取出每个 topic 下可消费的消息 key
- 根据消息 key 从 redis 中取出消息体内容,执行相应业务逻辑
存储结构设计:
- 消息内容存储: (key,val) 结构,例:(msgId1, msgBody)
- zset 消息 key 存储:(key, score, member),例:(topic1,time,msgId1)
- List 集合消息 key 存储:(key,[value…]),例:(topic1,[msgId1,msgId2])
上面 Zset 和 List 里面的消息根据不同 topic 作为 key 进行存储,这样可以避免将所有消息都放到一个 topic 下,当消息量比较多的时候出现 redis 里面的大 key 问题,导致存储和查询性能降低。
核心流程图如下:
redis 实现延迟队列的优缺点:
- 优点
- 消息存储在内存中,存储查询速度都比较快
- 对于一些简单的应用场景,redis 实现的延迟队列足够轻量,也基本能够符合业务需求
- 缺点
- 由于消息是存储在内存中,因此一旦服务器宕机,存在丢失数据的可能性,尽管 redis 具备持久化的功能,但要做到消息完全不丢失,那么 redis 的持久化策略就需要配置的更严格,但随之带来的就是性能的降低
- 消息的 Ack 机制缺失,如果要做到严格的 Ack 功能,则需要通过业务代码来实现,但随之带来的也就是复杂性的提高
总之没有绝对最好的技术实现,只有相对更适合业务需求的技术实现,任何特性在特定场景下可能是优点,而在另外一种场景可能就是缺点了。
核心代码实现:
Broker 调度实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class DelayQueueBroker {
private List<String> topicList;
private RedisDataStore redisDataStore;
protected ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
public DelayQueueBroker(List<String> topicList, RedisDataStore redisDataStore){ this.topicList = topicList; this.redisDataStore = redisDataStore; }
public void start(){ scheduledThreadPool.scheduleAtFixedRate(new RouteThread(), 1, 1, TimeUnit.SECONDS); }
private class RouteThread implements Runnable {
@Override public void run() { try { topicList.forEach(topic -> { List<String> topicKeyList = redisDataStore.zRangeByScore(topic, System.currentTimeMillis()); if(!CollectionUtils.isEmpty(topicKeyList)){ System.out.println("执行时间到了的topicKeyList=" + JSON.toJSONString(topicKeyList)); redisDataStore.lPushAll(topic, topicKeyList); redisDataStore.zRem(topic, topicKeyList); } }); } catch (Exception e){ e.printStackTrace(); } } } }
|
Consumer 消费实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class DelayQueueConsumer {
private List<String> topicList;
private RedisDataStore redisDataStore;
private Map<String, DelayQueueSubscriber> subscriberMap;
protected ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory());
public DelayQueueConsumer(List<String> topicList, Map<String, DelayQueueSubscriber> subscriberMap, RedisDataStore redisDataStore){ this.topicList = topicList; this.subscriberMap = subscriberMap; this.redisDataStore = redisDataStore; }
public void start(){ scheduledThreadPool.scheduleAtFixedRate(new DelayQueueConsumer.ConsumeThread(), 1, 1, TimeUnit.SECONDS); }
private class ConsumeThread implements Runnable {
@Override public void run() { topicList.forEach(topic -> { try { List<String> topicKeyList = redisDataStore.lRange(topic, 0, -1); topicKeyList.forEach(msgKey -> { System.out.println("执行时间到了, msgKey=" + msgKey); DelayQueueSubscriber subscriber = subscriberMap.get(topic); DelayMessage delayMessage = redisDataStore.get(msgKey); subscriber.consume(delayMessage); redisDataStore.lRem(topic, msgKey); }); }catch (Exception e){ e.printStackTrace(); } }); } } }
|
业务消费接口:
1 2 3 4 5
| public interface DelayQueueSubscriber {
void consume(DelayMessage delayMessage); }
|
Producer 生产实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class DelayQueueSender { private RedisDataStore redisDataStore; public DelayQueueSender(RedisDataStore redisDataStore){ this.redisDataStore = redisDataStore; }
public void add(DelayMessage delayMessage){ redisDataStore.set(delayMessage); redisDataStore.zAdd(delayMessage); } }
|
数据存储层实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class RedisDataStore extends AbstractDataStore {
private static final String MSG_KEY_PREFIX = "MSG_";
private static final String ZSET_KEY_PREFIX = "ZSET_";
private static final String LIST_KEY_PREFIX = "LIST_";
private final RedisTemplate<String, String> redisTemplate;
private final long DELAY_EXPIRED_MILL_SECONDS = 5 * 60 * 1000;
public RedisDataStore(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; }
public boolean set(DelayMessage message) { redisTemplate.opsForValue().set(MSG_KEY_PREFIX + message.getKey(), JSON.toJSONString(message), message.getTimestamp() - System.currentTimeMillis() + DELAY_EXPIRED_MILL_SECONDS, TimeUnit.MILLISECONDS); return true; }
public DelayMessage get(String key) { String value = redisTemplate.opsForValue().get(MSG_KEY_PREFIX + key); if (StringUtils.isNotEmpty(value)) { return JSON.parseObject(value, DelayMessage.class); } return null; }
public Boolean zAdd(DelayMessage message) { return redisTemplate.opsForZSet().add(ZSET_KEY_PREFIX + message.getTopic(), message.getKey(), message.getTimestamp()); }
public List<String> zRangeByScore(String key, long timestamp) { Set<ZSetOperations.TypedTuple<String>> tupleSet = redisTemplate.opsForZSet() .rangeByScoreWithScores(ZSET_KEY_PREFIX + key, 0, timestamp); List<String> msgKeyList = new ArrayList<>(); if (tupleSet != null) { tupleSet.forEach(item -> { String val = item.getValue(); Double score = item.getScore(); if (score != null && score.longValue() < timestamp) { msgKeyList.add(val); } }); } return msgKeyList; }
public boolean zRem(String key, List<String> values) { Long result = redisTemplate.opsForZSet().remove(ZSET_KEY_PREFIX + key, values.toArray(new Object[0])); return result != null && result.intValue() == values.size(); }
public boolean lPushAll(String key, List<String> values) { Long result = redisTemplate.opsForList().leftPushAll(LIST_KEY_PREFIX + key, values); return true; }
public boolean lRem(String key, String value){ Long result = redisTemplate.opsForList().remove(LIST_KEY_PREFIX + key, 0, value); return result != null && result.intValue() == 1; }
public List<String> lRange(String key, int start, int end) { return redisTemplate.opsForList().range(LIST_KEY_PREFIX + key, start, end);
} }
|
分别启动运行 Broker,Consumer,最后就可以往延迟队列里面添加消息了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) {
RedisDataStore redisDataStore = new RedisDataStore(stringRedisTemplate);
List<String> topicList = Arrays.asList("DELAY_QUEUE_TOPIC1", "DELAY_QUEUE_TOPIC2"); DelayQueueBroker broker = new DelayQueueBroker(topicList, redisDataStore); broker.start(); Map<String, DelayQueueSubscriber> subscriberMap = new HashMap<>(); subscriberMap.put("DELAY_QUEUE_TOPIC1", new Topic1MessageSubscriber()); subscriberMap.put("DELAY_QUEUE_TOPIC2", new Topic2MessageSubscriber());
DelayQueueConsumer consumer = new DelayQueueConsumer(topicList, subscriberMap, redisDataStore); consumer.start(); DelayQueueSender sender = new DelayQueueSender(redisDataStore); DelayMessage delayMessage = new DelayMessage(); delayMessage.setKey("TOPIC1_KEY_" + i); delayMessage.setTopic("DELAY_QUEUE_TOPIC1"); delayMessage.setTimestamp(System.currentTimeMillis() + i * 1000); delayMessage.setContent("TOPIC2_CONTENT_" + i); sender.add(delayMessage);
}
|
通过 redis 来实现延迟队列的大致原理就如上面所描述,但这样肯定是还没办法用到实际生产业务中去,首先 redis 执行命令的原子性就没有做保证,比如说将消息 lpush 放到 List 中成功了,而 Zset 中 zrem 失败了就会出现消息多次消费的情况,上面整个的描述和一些简单实现示例就是为了把实现原理介绍清楚,距离真正应用到业务生产中还有很多需要优化的地方,比如说引入 Lua 脚本来做到批量命令执行的原子性,加入消息消费的重试策略,Broker 和 Consumer 定时任务优化,避免在没有消息要处理的时候存在很多无效的 redis 请求。