Fork me on GitHub
0%

如何通过 Redis 来实现消息延迟队列

延迟队列或许大家都听过,可能也用过,一种比较常见的应用场景就是用户下单后,超过 10 分钟未支付将用户订单取消,这种其实就可以通过延迟队列来实现,这里先不考虑订单数量特别大的情况。大概流程就是用户下单后,订单服务创建订单保存后同时将这笔订单信息放入延迟队列中,设置延迟时间为 10 分钟,10 分钟后从队列中取出来判断支付状态,未支付则将这笔订单取消。

实现延迟队列的方式或许有很多种,如果用 redis 来实现一个延迟队列功能应该怎么做呢,主要是通过 redis 里面的 Zset 集合来实现,Zset 自带的 score 值排序特性就非常适合用来做时间戳排序,当消息的时间戳值小于等于当前时间戳,将该消息取出来进行消费,执行相应的业务逻辑。

实现原理大致如下:

主要分为三大块,生产者,调度器,消费者,生产者负责往 redis 里面生产消息,调度器定时从 Zset 中取出已经到了执行时间的消息并移到 List 集合中,同时消费者负责不断将 List 集合中的消息取出来进行消费。

  • Producer
    1. 首先将消息原始内容保存到 redis 中,key,val 结构,key 是消息体的唯一标识,val 是消息体内容
    2. 将消息 key 根据延迟时间设置对应 score 值添加到 Zset 有序集合中,key 为消息体 topic,score 为当前时间戳加上延迟时间,member 为消息 key
  • Broker
    1. 定时遍历所有 topic,从 Zset 有序集合中通过 zrangebyscore 命令取出每个 topic 下 score 值小于等于当前时间戳的消息 key
    2. 将取出来的消息 key push 放入已经就绪可消费的消息 List 集合中
  • Consumer
    1. 定时遍历所有消息就绪的 topic,取出每个 topic 下可消费的消息 key
    2. 根据消息 key 从 redis 中取出消息体内容,执行相应业务逻辑

存储结构设计:

  • 消息内容存储: (key,val) 结构,例:(msgId1, msgBody)
  • zset 消息 key 存储:(key, score, member),例:(topic1,time,msgId1)
  • List 集合消息 key 存储:(key,[value…]),例:(topic1,[msgId1,msgId2])

上面 Zset 和 List 里面的消息根据不同 topic 作为 key 进行存储,这样可以避免将所有消息都放到一个 topic 下,当消息量比较多的时候出现 redis 里面的大 key 问题,导致存储和查询性能降低。

核心流程图如下:

redisDelayQueue

redis 实现延迟队列的优缺点:

  • 优点
    1. 消息存储在内存中,存储查询速度都比较快
    2. 对于一些简单的应用场景,redis 实现的延迟队列足够轻量,也基本能够符合业务需求
  • 缺点
    1. 由于消息是存储在内存中,因此一旦服务器宕机,存在丢失数据的可能性,尽管 redis 具备持久化的功能,但要做到消息完全不丢失,那么 redis 的持久化策略就需要配置的更严格,但随之带来的就是性能的降低
    2. 消息的 Ack 机制缺失,如果要做到严格的 Ack 功能,则需要通过业务代码来实现,但随之带来的也就是复杂性的提高

总之没有绝对最好的技术实现,只有相对更适合业务需求的技术实现,任何特性在特定场景下可能是优点,而在另外一种场景可能就是缺点了。

核心代码实现:

Broker 调度实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class DelayQueueBroker {

private List<String> topicList;

private RedisDataStore redisDataStore;

protected ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());

public DelayQueueBroker(List<String> topicList, RedisDataStore redisDataStore){
this.topicList = topicList;
this.redisDataStore = redisDataStore;
}

public void start(){
scheduledThreadPool.scheduleAtFixedRate(new RouteThread(), 1, 1, TimeUnit.SECONDS);
}

private class RouteThread implements Runnable {

@Override
public void run() {
try {
topicList.forEach(topic -> {
List<String> topicKeyList = redisDataStore.zRangeByScore(topic, System.currentTimeMillis());
if(!CollectionUtils.isEmpty(topicKeyList)){
System.out.println("执行时间到了的topicKeyList=" + JSON.toJSONString(topicKeyList));
redisDataStore.lPushAll(topic, topicKeyList);
redisDataStore.zRem(topic, topicKeyList);
}
});
} catch (Exception e){
e.printStackTrace();
}
}
}
}

Consumer 消费实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DelayQueueConsumer {

private List<String> topicList;

private RedisDataStore redisDataStore;

private Map<String, DelayQueueSubscriber> subscriberMap;

protected ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, Executors.defaultThreadFactory());

public DelayQueueConsumer(List<String> topicList, Map<String, DelayQueueSubscriber> subscriberMap, RedisDataStore redisDataStore){
this.topicList = topicList;
this.subscriberMap = subscriberMap;
this.redisDataStore = redisDataStore;
}

public void start(){
scheduledThreadPool.scheduleAtFixedRate(new DelayQueueConsumer.ConsumeThread(), 1, 1, TimeUnit.SECONDS);
}

private class ConsumeThread implements Runnable {

@Override
public void run() {
topicList.forEach(topic -> {
try {
List<String> topicKeyList = redisDataStore.lRange(topic, 0, -1);
topicKeyList.forEach(msgKey -> {
System.out.println("执行时间到了, msgKey=" + msgKey);
DelayQueueSubscriber subscriber = subscriberMap.get(topic);
DelayMessage delayMessage = redisDataStore.get(msgKey);
subscriber.consume(delayMessage);
redisDataStore.lRem(topic, msgKey);
});
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}

业务消费接口:

1
2
3
4
5
    // 延迟消息消费业务逻辑实现接口
public interface DelayQueueSubscriber {

void consume(DelayMessage delayMessage);
}

Producer 生产实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DelayQueueSender {

private RedisDataStore redisDataStore;

public DelayQueueSender(RedisDataStore redisDataStore){
this.redisDataStore = redisDataStore;
}

public void add(DelayMessage delayMessage){
redisDataStore.set(delayMessage);
redisDataStore.zAdd(delayMessage);
}
}

数据存储层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class RedisDataStore extends AbstractDataStore {

private static final String MSG_KEY_PREFIX = "MSG_";

private static final String ZSET_KEY_PREFIX = "ZSET_";

private static final String LIST_KEY_PREFIX = "LIST_";

private final RedisTemplate<String, String> redisTemplate;

/**
* 延迟过期时间.
*/
private final long DELAY_EXPIRED_MILL_SECONDS = 5 * 60 * 1000;

public RedisDataStore(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}

public boolean set(DelayMessage message) {
redisTemplate.opsForValue().set(MSG_KEY_PREFIX + message.getKey(), JSON.toJSONString(message),
message.getTimestamp() - System.currentTimeMillis() + DELAY_EXPIRED_MILL_SECONDS, TimeUnit.MILLISECONDS);
return true;
}

public DelayMessage get(String key) {
String value = redisTemplate.opsForValue().get(MSG_KEY_PREFIX + key);
if (StringUtils.isNotEmpty(value)) {
return JSON.parseObject(value, DelayMessage.class);
}
return null;
}

public Boolean zAdd(DelayMessage message) {
return redisTemplate.opsForZSet().add(ZSET_KEY_PREFIX + message.getTopic(), message.getKey(), message.getTimestamp());
}

public List<String> zRangeByScore(String key, long timestamp) {
Set<ZSetOperations.TypedTuple<String>> tupleSet = redisTemplate.opsForZSet()
.rangeByScoreWithScores(ZSET_KEY_PREFIX + key, 0, timestamp);
List<String> msgKeyList = new ArrayList<>();
if (tupleSet != null) {
tupleSet.forEach(item -> {
String val = item.getValue();
Double score = item.getScore();
if (score != null && score.longValue() < timestamp) {
msgKeyList.add(val);
}
});
}
return msgKeyList;
}

public boolean zRem(String key, List<String> values) {
Long result = redisTemplate.opsForZSet().remove(ZSET_KEY_PREFIX + key, values.toArray(new Object[0]));
return result != null && result.intValue() == values.size();
}

public boolean lPushAll(String key, List<String> values) {
Long result = redisTemplate.opsForList().leftPushAll(LIST_KEY_PREFIX + key, values);
return true;
}

public boolean lRem(String key, String value){
Long result = redisTemplate.opsForList().remove(LIST_KEY_PREFIX + key, 0, value);
return result != null && result.intValue() == 1;
}

public List<String> lRange(String key, int start, int end) {
return redisTemplate.opsForList().range(LIST_KEY_PREFIX + key, start, end);

}
}

分别启动运行 Broker,Consumer,最后就可以往延迟队列里面添加消息了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {

RedisDataStore redisDataStore = new RedisDataStore(stringRedisTemplate);

List<String> topicList = Arrays.asList("DELAY_QUEUE_TOPIC1", "DELAY_QUEUE_TOPIC2");
DelayQueueBroker broker = new DelayQueueBroker(topicList, redisDataStore);
broker.start();

Map<String, DelayQueueSubscriber> subscriberMap = new HashMap<>();
subscriberMap.put("DELAY_QUEUE_TOPIC1", new Topic1MessageSubscriber());
subscriberMap.put("DELAY_QUEUE_TOPIC2", new Topic2MessageSubscriber());

DelayQueueConsumer consumer = new DelayQueueConsumer(topicList, subscriberMap, redisDataStore);
consumer.start();

DelayQueueSender sender = new DelayQueueSender(redisDataStore);
DelayMessage delayMessage = new DelayMessage();
delayMessage.setKey("TOPIC1_KEY_" + i);
delayMessage.setTopic("DELAY_QUEUE_TOPIC1");
delayMessage.setTimestamp(System.currentTimeMillis() + i * 1000);
delayMessage.setContent("TOPIC2_CONTENT_" + i);
sender.add(delayMessage);

}

通过 redis 来实现延迟队列的大致原理就如上面所描述,但这样肯定是还没办法用到实际生产业务中去,首先 redis 执行命令的原子性就没有做保证,比如说将消息 lpush 放到 List 中成功了,而 Zset 中 zrem 失败了就会出现消息多次消费的情况,上面整个的描述和一些简单实现示例就是为了把实现原理介绍清楚,距离真正应用到业务生产中还有很多需要优化的地方,比如说引入 Lua 脚本来做到批量命令执行的原子性,加入消息消费的重试策略,Broker 和 Consumer 定时任务优化,避免在没有消息要处理的时候存在很多无效的 redis 请求。

 wechat
扫描上面图中二维码关注微信公众号