Fork me on GitHub
0%

异步接口实践之 DeferredResult

前两篇文章主要讨论了关于异步接口在 SrpingMVC 体系下的实现过程,通过查看源码的方式大概梳理了异步走过的整个流程,简要的总结就是:

  1. 接口的返回值是 Callable,WebAsyncTask,DeferredResult 等类型。
  2. SpringMVC 在解析接口的返回值的时如果发现是上面的类型,找到能处理该返回值类型的 returnValueHandler 处理器。
  3. 在相应的 returnValueHandler 里面有一个处理返回值的方法,在这个方法中开启异步的方式来执行接口返回值里面真正的获取处理结果的方法。
  4. 第三步里面以异步的方式执行,容器中处理请求的线程继续往下执行,然后容器线程被释放,response 还是处于打开状态。
  5. 第三步里面异步执行结束得到结果后再次请求容器线程,继续处理真正结果,由于真正的结果不是上面的类型,一般匹配到 RequestResponseBodyMethodProcessor,这时会将 Callable,WebAsyncTask,DeferredResult 异步执行的结果返回给客户端,前端拿到结果之后,本次请求结束。

至于 Callable 和 WebAsyncTask,DeferredResult 这三种类型作为接口的返回值有什么区别呢,之前的文章中也已经大概说了。其中 DeferredResult 是三者中最灵活的,因为他的异步执行完全是由你自己来控制的,接下来我就用它来模拟一个异步购买商品的接口。

这里假设我们商品购买和支付的流程是这样的:

  1. 用户在前端点击购买,请求订单服务购买接口,接口的返回值就是 DeferredResult
  2. 订单服务的购买接口里面首先创建订单信息
  3. 往支付请求队列里面发送一个支付请求消息,同时保存要返回的 DeferredResult 实例
  4. 支付服务监听支付请求队列,收到消息后根据订单金额往对应的网关发起支付请求
  5. 支付成功之后,支付服务将支付结果发送到支付响应队列
  6. 订单服务监听支付响应队列,收到支付结果消息后,更新订单状态信息,取出第三步中保存的 DeferredResult,将支付结果设置给 DeferredResult
  7. 前端获取到支付结果,整个购买请求结束

上面算是一个简化版的商品购买支付流程,由于支付的那一步是相对要耗时一点的,因为要调用第三方网关发起支付请求,所以购买的接口我返回了 DeferredResult。

当然在这个接口你也可以直接先给前端返回一个待支付订单信息的临时结果,然后前端再通过订单 id 轮询的请求后台接口支付结果。

这种方式其实不是很好,首先有一个是结果返回不及时,因为后端在产生结果之后,前端不会立马得到,需要在下一次轮询中才能得到。

还一个就是,如果同一时间下单的用户比较多,就会有大量的轮询请求,后端容器会频繁的创建线程来处理轮询请求,在结果产生前,这些轮询请求都是无意义的,有点浪费资源。

那么接下来看下通过 DeferredResult 怎么实现,大概流程我上面已经描述了,下面我们看下主要的代码实现。

1
2
3
4
5
6
7
8
9
10
11
12
@PostMapping("purchase")
public DeferredResult<PurchaseResultVO> purchase(@PathVariable("productId") Long productId){
DeferredResult<PurchaseResultVO> deferredResult = new DeferredResult<>();
Order order = this.orderService.purchase(productId, deferredResult);
deferredResult.onError((throwable) -> {
log.error("purchase error", throwable);
this.deferredResultStore.remove(order.getUserId(), order.getId());
});
deferredResult.onTimeout(() -> this.deferredResultStore.remove(order.getUserId(), order.getId()));
deferredResult.onCompletion(() -> this.deferredResultStore.remove(order.getUserId(), order.getId()));
return deferredResult;
}

首先,提供一个商品购买的接口,接口的返回值是 DeferredResult,接口调用 orderService 处理购买流程,同时配置 DeferredResult 的三个回调,在三个回调中都调用了 deferredResultStore 中的 remove 方法,主要是在发生错误,超时,支付完成时将保存在内存中的 DeferredResult 移除掉,标志本次请求结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final Map<String, DeferredResult> DEFERRED_RESULTS = new ConcurrentHashMap<>();

public void put(Long userId, Long orderId, DeferredResult<PurchaseResultVO> deferredResult) {
DEFERRED_RESULTS.putIfAbsent(assembleKey(userId, orderId), deferredResult);
}

public DeferredResult get(Long userId, Long orderId) {
return DEFERRED_RESULTS.get(assembleKey(userId, orderId));
}

public void remove(Long userId, Long orderId) {
DEFERRED_RESULTS.remove(assembleKey(userId, orderId));
}

private String assembleKey(Long userId, Long orderId){
return "purchase:" + userId + ":" + orderId;
}

deferredResultStore 这个主要是通过一个 map 来存储购买接口在请求完成之前产生的 DeferredResult,当支付完成时再从 map 里面取出来设置支付结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Order purchase(Long productId, DeferredResult<PurchaseResultVO> deferredResult) {
// FIXME 获取当前用户 id
Long currentUserId = 1L;
// FIXME 模拟查询商品,得到价格信息
// 构建订单信息
Order order = new Order();
order.setId(1L);
order.setStatus(0);
order.setProductId(productId);
order.setAmount(50L);
order.setUserId(currentUserId);
// FIXME 保存订单数据到数据库
// 构建支付请求
PayRequest payRequest = new PayRequest();
payRequest.setGateway("we_chat");
payRequest.setProfileId(1L);
payRequest.setAmount(order.getAmount());
payRequest.setOrderDTO(OrderConverter.INSTANCE.domain2DTO(order));
// 发送支付请求到队列请求支付
this.requestRabbitMQSender.send(payRequest);
// 临时保存待响应的结果
this.deferredResultStore.put(currentUserId, order.getId(), deferredResult);
return order;
}

orderService 里面的 purchase 方法处理订单的持久化,向支付队列发送支付请求消息,保存接口生成的 deferredResult。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RabbitListener(queues = "${spring.rabbitmq.listener-request-queue}")
public void onMessage(PayRequest message) {
log.info("=============== request queue received message ->{}", message.toString());
// 根据支付请求模拟网关支付
log.info("发起网关支付 -> {}", message.toString());
TimeUnit.SECONDS.sleep(3);
// 然后将支付结果放到队列
PayResponse response = new PayResponse();
response.setSuccess(true);
response.setPayId(1L);
response.setOrderDTO(message.getOrderDTO());
log.info("=============== 网关支付成功,将结果放到结果队列 ->{}", message.toString());
this.responseRabbitMQSender.send(response);

}

同时支付服务在监听支付请求消息队列,这里只是模拟,所以都在同一个工程里面。收到支付请求消息之后,向网关发起扣款,扣款成功后再将结果塞回支付响应队列里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RabbitListener(queues = "${spring.rabbitmq.listener-response-queue}")
public void onMessage(PayResponse message) {
log.info("=============== response queue received message ->{}", message.toString());
// 根据支付结果,查询并且更新数据库订单状态
OrderDTO orderDTO = message.getOrderDTO();
log.info("查询数据库订单信息 -> {}", orderDTO.toString());
Order order = OrderConverter.INSTANCE.dto2Domain(orderDTO);
boolean success = message.getSuccess();
if(success){
order.setStatus(1);
order.setPayId(message.getPayId());
}else{
order.setStatus(-1);
order.setErrorMsg(message.getErrorMsg());
}
log.info("保存订单最新状态信息 -> {}", order.toString());
// 设置支付返回结果
DeferredResult<PurchaseResultVO> deferredResult = this.deferredResultStore.get(order.getUserId(), order.getId());
if(deferredResult == null){
return;
}
PurchaseResultVO resultVO = new PurchaseResultVO();
resultVO.setSuccess(success);
resultVO.setProductId(order.getProductId());
resultVO.setErrorMsg(message.getErrorMsg());
log.info("设置支付结果返回 -> {}", resultVO.toString());
deferredResult.setResult(resultVO);
}

同样的,订单服务在监听支付响应的消息队列,收到支付结果消息后更新订单状态,从 deferredResultStore 中取出刚才保存的 deferredResult,设置支付结果,然后前端收到购买结果,刷新页面信息,整个购买请求结束。

在整个过程中,我们可以看到一旦订单服务收到支付结果消息,就会立马放到 deferredResult 中,前端也能够立马得到响应,前端不需要做额外的轮询操作,同样后台也不用再提供轮询的接口,而且支付结果并不会像轮询一样有延迟。

还有就是避免了大量的轮询请求,而且在后台处理支付的过程中,接收购买请求的线程已经被释放了,并不会占额外的资源,在收到结果后才重新请求容器线程来返回支付结果给前端。

但是这个其实也有一个问题,因为是异步的,必然有线程之前的切换的过程,首先,前端请求到后台容器线程,容器线程开启异步执行处理,容器线程释放,异步执行结束,又再重新请求到容器线程,容器线程返回结果给前端。

这里面有好几次的线程切换,这其实相对来说也是比较耗时的。这就看你的异步执行的过程是不是需要一定时间才能执行完,如果是需要一定时间的话,相对于线程间的切换这一点时间的话还是可以接受的,毕竟这样也带来我上面说到的这些好处。

上面就是使用 DeferredResult 来实现了异步接口模拟了商品购买支付的一个流程,虽然说里面没有更详细的数据库操作以及购买的时候的真正扣款,扣库存的那些操作,但整体流程已经很清晰了,通过这个例子应该能对异步接口有进一步的了解。

上面例子的代码我也已经提交到 GitHub 了,感兴趣的可以去下载查看。地址是:blog sample code

这里面也包含了之前的 JdbcTemplate 操作的样例代码,之后我也会把文章中的样例代码上传到这个工程中,建议 star。

同时,没有看过前面两篇异步解析的文章也强烈建议去看下,这样有助于理解。

 wechat
扫描上面图中二维码关注微信公众号