分布式事务-扣减库存
一、最终一致性架构图
1、服务
左侧:创建订单服务Server1
右侧:扣减库存服务Server2
中间:独立消息服务Server3
2、中间件:
Kafka-MQ、MySQL-db
二、步骤
0、定义MQ,三个状态
- prepare
- confirm
- ack
1、Server1
- 生成单据号
- 调用Server3#updateMQ接口,状态为prepare
2、Server1
- 下单
3+5、Server1
- 下单结束后,调用Server3#updateMQ接口,状态为confirm
4、Server3
-
定时任务,捞取db中所有单据号(单据号id、消息id、业务类型、消息状态、消息体)
-
遍历所有状态为prepare的单据号
-
查询server1中#queryOrder接口
-
如果没有用查询到订单信息,说明server1服务下单流程失败了,没有建单
此时,直接删除db中此无用单据信息
-
如果查询到了订单信息,说明server1服务的confirm阶段失败了
此时,将db中此单据id对应的消息状态,更新为confirm、并sendMQ到消息队列
-
-
6、单据id对应的消息状态为confirm的MQ,被发送到MQ中了
7、server2
扣减库存服务,监听到MQ后,消费MQ
-
扣减库存成功,则ACK
- 调用server3#updateMQ接口,将单据id的状态改为ACK
-
扣减库存成功,则Kafka有重试机制,也有死信机制,保障消费一定成功
三、代码实现
server3服务
1)updateMQ接口入参定义
public class Message implements Serializable {private String orderId;/*** 业务类型:1-优选订单、2-买菜订单*/private Integer businessType;/*** MQ状态:1-prepare、2-confirm、3-ack*/private Integer status;private BizData bizData;
}
2)updateMQ方法内容
@Transactionalpublic void updateMQ(Message mq) {Integer status = mq.getStatus();if (status == 1) {// saveDb} else if (status == 2) {// sendMQ2Kafka// updateDb(将单据状态改为2)} else if (status == 3) {// updateDb(将单据状态改为3)}}
3)定时任务
@Scheduled(cron = "0 * * * * *")public void craneTask() {// 1、拉去db// 2、获取状态为prepare=1的单据// 3.查看server1服务查询订单接口// 3.1 订单不存在,则删除db此条数据// 3.2 订单存在, 则更新db此条数据状态为confirm、然后sendMQ2Kafka}
server1
- 提供查询订单接口服务
- 在步骤1,创建订单之前时,调用server3#updateMQ接口,入参状态为prepare=1
- 在步骤3和5,创建订单之后时,调用server3#updateMQ接口,入参状态为confirm=2
server2服务
- 监听kafka的消息,消费,扣减库存即可
erver3#updateMQ接口,入参状态为prepare=1 - 在步骤3和5,创建订单之后时,调用server3#updateMQ接口,入参状态为confirm=2
server2服务
- 监听kafka的消息,消费,扣减库存即可
- 消费消息成功后,调用updateMQ,入参状态为ACK=3