目录
一、前言
二、库存
三、订单
一、前言
上一篇使用springcloud-seata解决分布式事务问题-2PC模式我们说到了使用springcloud-seata解决分布式的缺点——不适用于高并发场景
因此我们使用延迟队列来解决分布式事务问题,即使用柔性事务-可靠消息-最终一致性方案(异步确保型)
以下是下订单的代码
// @GlobalTransactional 不使用seata@Transactional@Overridepublic SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {submitVoThreadLocal.set(vo);MemberResVo memberResVo = LoginUserInterceptor.loginUser.get();SubmitOrderResponseVo response = new SubmitOrderResponseVo();response.setCode(0);String redisToken = redisTemplate.opsForValue().get(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResVo.getId());String orderToken = vo.getOrderToken();// 成功返回1 失败返回0String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";// 保证原子性Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResVo.getId()), orderToken);if(result == 0L) {// 验证失败response.setCode(1);return response;} else {// 下单,创建订单,校验令牌,检验价格,锁库存// TODO 1、创建订单,订单项等信息OrderCreateTo order = createOrder();// TODO 2、验价BigDecimal payAmount = order.getOrder().getPayAmount();if(Math.abs(payAmount.subtract(vo.getPayPrice()).doubleValue()) < 0.01) {// 金额对比成功后保存订单// TODO 3、保存订单saveOrder(order);WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());List<OrderItemVo> collect = order.getOrderItems().stream().map(item -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setSkuId(item.getSkuId());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());wareSkuLockVo.setLocks(collect);// TODO 4、锁库存// 出异常后,因为远程锁库存成功,但是忘了原因超时了,订单回滚,库存不回滚// 为了保证高并发,库存服务自己要回滚,可以发消息给库存服务// 库存服务本身也可以使用自动解锁模式 即使用消息队列R r = wareFeignService.orderLockStock(wareSkuLockVo);if(r.getCode() == 0) {// 锁成功response.setOrder(order.getOrder());// TODO 5 出异常
// int i = 10/0;return response;} else {// 锁定失败// 抛异常才能使事务回滚response.setCode(3);throw new NoStockException((String)r.get("msg"));// return response;}} else {response.setCode(2); // 金额对比失败return response;}}}
二、库存
库存服务设计图,首先创建stock-event-exchange交换机,还有stock.release.stock.queue和stock.delay.queue(死信队列)两个队列,交换机和stock.release.stock.queue之间通过路由stock.release.#绑定,交换机和stock.delay.queue(死信队列)通过路由stock.locked绑定
流程解释:当库存锁定成功后,发消息给交换机,交换机通过路由发送到死信队列中,通过死信队列的延迟效果,在时间到期后再路由到交换机,交换机再放入普通队列(绿色),此时只要有方法监听这个队列,就可以拿到消息进行消费
向rabbitmq注册队列、交换机和绑定的代码如下:
@Configuration
public class MyRabbitConfig {@AutowiredRabbitTemplate template;/** 使用JSON序列化机制,进行消息转换*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// @RabbitListener(queues = "stock.release.stock.queue")
// public void handle(Message message) {
//
// }@Beanpublic Exchange stockEventExchange() {// String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsreturn new TopicExchange("stock-even-exchange", true, false, null);}@Beanpublic Queue stockReleaseStockQueue() {//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsreturn new Queue("stock.release.stock.queue",true, false,false, null );}@Beanpublic Queue stockDelayQueue() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "stock-even-exchange");arguments.put("x-dead-letter-routing-key", "stock.release");arguments.put("x-message-ttl", 120000); // 消息过期时间 1分钟return new Queue("stock.delay.queue", true, false, false, arguments);}@Beanpublic Binding stockLockBinding() {return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-even-exchange","stock.release.#",null);}@Beanpublic Binding stockReleaseBinding() {return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-even-exchange","stock.locked",null);}}
库存锁定方法,若锁定成功会发送消息到死信队列
@Transactional@Overridepublic Boolean orderLockStock(WareSkuLockVo vo) {// 先创建订单详情表WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();taskEntity.setOrderSn(vo.getOrderSn());orderTaskService.save(taskEntity);// 1、找到每个商品在哪个仓库都有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map(item -> {SkuWareHasStock stock = new SkuWareHasStock();Long skuId = item.getSkuId();stock.setSkuId(skuId);stock.setNum(item.getCount());// 查询这个商品在哪里有库存List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);stock.setWareId(wareIds);return stock;}).collect(Collectors.toList());// 2、锁定库存// 1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ// 2、锁定失败,前面保存的工作单信息就回滚了。即使要解锁记录,由于去数据库查不到id,所以就不用解锁了for (SkuWareHasStock hasStock : collect) {Boolean skuStocked = false;Long skuId = hasStock.getSkuId();List<Long> wareId = hasStock.getWareId();if(wareId == null || wareId.size() == 0) {// 没有任何仓库有这个商品的库存throw new NoStockException(skuId);}for(Long ware : wareId) {// 返回受影响的行数 成功返回1 失败返回0Long count = wareSkuDao.lockSkuStock(skuId, ware, hasStock.getNum());if(count == 1) {//成功skuStocked = true;// TODO 告诉MQ库存锁定成功WareOrderTaskDetailEntity orderTaskDetailEntity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), ware, 1);orderTaskDetailService.save(orderTaskDetailEntity);// 通知rabbitmq锁定成功StockLockedTo stockLockedTo = new StockLockedTo();StockDetailTo stockDetailTo = new StockDetailTo();BeanUtils.copyProperties(orderTaskDetailEntity, stockDetailTo);// 只发id不行,防止回滚以后详情表也被回滚了,而库存又被扣减了,此时就无法解锁了stockLockedTo.setDetail(stockDetailTo);stockLockedTo.setId(taskEntity.getId());rabbitTemplate.convertAndSend("stock-even-exchange", "stock.locked", stockLockedTo);break;} else {//当前仓库锁失败,重试下一个仓库}}if(skuStocked == false) {throw new NoStockException(skuId);}}return true;}
然后是监听普通队列的方法
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁** 2、订单失败* 库存锁定失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败** 该方法是处理库存自己发给自己的*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
wareSkuService的unlockStock方法如下,只有订单为取消状态或者订单不存在才解锁库存
/** 解锁的方法,如果报错就抛异常让StockReleaseListener去抓*/@Overridepublic void unlockStock(StockLockedTo to) {//库存工作单的idStockDetailTo detail = to.getDetail();Long detailId = detail.getId();/*** 解锁* 1、查询数据库关于这个订单锁定库存信息* 有:证明库存锁定成功了* 解锁:订单状况* 1、没有这个订单,必须解锁库存* 2、有这个订单,不一定解锁库存* 订单状态:已取消:解锁库存* 已支付:不能解锁库存*/WareOrderTaskDetailEntity taskDetailInfo = orderTaskDetailService.getById(detailId);if (taskDetailInfo != null) {//查出wms_ware_order_task工作单的信息Long id = to.getId();WareOrderTaskEntity orderTaskInfo = orderTaskService.getById(id);//获取订单号查询订单状态String orderSn = orderTaskInfo.getOrderSn();//远程查询订单信息R orderData = orderFeignService.getOrderStatus(orderSn);if (orderData.getCode() == 0) {//订单数据返回成功OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});//判断订单状态是否已取消或者支付或者订单不存在if (orderInfo == null || orderInfo.getStatus() == 4) {//订单已被取消,才能解锁库存if (taskDetailInfo.getLockStatus() == 1) {//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);}}} else {//消息拒绝以后重新放在队列里面,让别人继续消费解锁//远程调用服务失败throw new RuntimeException("远程调用服务失败");}} else {//无需解锁}}
unLockStock方法如下:
public void unLockStock(Long skuId, Long wareId, Integer num ,Long taskDetailId) {wareSkuDao.unLockStock(skuId, wareId, num);// 解锁后应该改变订单详情的状态WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setId(taskDetailId);wareOrderTaskDetailEntity.setLockStatus(2);orderTaskDetailService.updateById(wareOrderTaskDetailEntity);}
小结:即锁库存成功后,我们要搞一个定时任务一样,在一段时间(当然这里设计的时间需要比订单支付的时间长,比如pdd下单,没支付,30分钟后就会自动取消订单,所以要比30分钟长才可以)后检查一下订单是否支付,没有支付过期了或者说取消订单了,我们需要把锁住的库存恢复这里恢复我们使用得是wms_ware_order_task表(主要记录订单,即是哪一个订单的)和wms_ware_order_task_detail表(记录订单中的每一项商品,比如skuId为42的锁住了2件)
其结构如下:
接着我们需要考虑传递什么值到rabbitmq队列里,使得拿到值的方法可以有足够的信息恢复(解锁库存),因此我们封装了一个StockLockedTo,其中的StockDetailTo其实就是表ware_order_task的实体,只是为了传输重新封装了一个StockDetailTo,所以监听的方法拿到消息中的实体后就可以进行解锁库存操作了
@Data
public class StockLockedTo {private Long id; //库存工作单的idprivate StockDetailTo detail; // 工作详情
}
三、订单
那么如果订单被取消的时候直接发送消息给mq的库存队列,然后mq进行解锁,是不是一个双重保障呢?确实如此,就相当于订单是主体,而库存的队列相当于补偿
订单设计图如下
解读:创建一个order-event-exchange交换机和两个队列,死信队列order.delay.queue和普通队列order.release.order.queue,交换机和order.delaly.queue通过路由order.create.order绑定,交换机和order.release.order.queue通过路由order.release.order绑定。当订单创建成功后,发消息到死信队列,如果30分钟内没付款,就会通过路由器进入普通队列,此时监听普通队列的方法就可以取出消息更改订单状态,同时发送解锁消息给库存的队列
向rabbitmq注册队列和交换机的代码:
@Configuration
public class MyMQConfig {// 通过bean的形式向rabbitmq创建Queue、Exchange、Binding/*** 死信队列** @return*/@Beanpublic Queue orderDelayQueue() {/*Queue(String name, 队列名字boolean durable, 是否持久化boolean exclusive, 是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "order-event-exchange");arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false, arguments);return queue;}/*** 普通队列** @return*/@Beanpublic Queue orderReleaseQueue() {Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}/*** TopicExchange** @return*/@Beanpublic Exchange orderEventExchange() {/** String name,* boolean durable,* boolean autoDelete,* Map<String, Object> arguments* */return new TopicExchange("order-event-exchange", true, false);}@Beanpublic Binding orderCreateBinding() {/** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/** 订单释放直接发送消息到进行绑定*/@Beanpublic Binding orderReleaseOtherBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}
}
下订单的方法
@Transactional@Overridepublic SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {submitVoThreadLocal.set(vo);MemberResVo memberResVo = LoginUserInterceptor.loginUser.get();SubmitOrderResponseVo response = new SubmitOrderResponseVo();response.setCode(0);String redisToken = redisTemplate.opsForValue().get(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResVo.getId());String orderToken = vo.getOrderToken();// 成功返回1 失败返回0String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";// 保证原子性Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResVo.getId()), orderToken);if(result == 0L) {// 验证失败response.setCode(1);return response;} else {// 下单,创建订单,校验令牌,检验价格,锁库存// TODO 1、创建订单,订单项等信息OrderCreateTo order = createOrder();// TODO 2、验价BigDecimal payAmount = order.getOrder().getPayAmount();if(Math.abs(payAmount.subtract(vo.getPayPrice()).doubleValue()) < 0.01) {// 金额对比成功后保存订单// TODO 3、保存订单saveOrder(order);WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());List<OrderItemVo> collect = order.getOrderItems().stream().map(item -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setSkuId(item.getSkuId());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());wareSkuLockVo.setLocks(collect);// TODO 4、锁库存// 出异常后,因为远程锁库存成功,但是忘了原因超时了,订单回滚,库存不回滚// 为了保证高并发,库存服务自己要回滚,可以发消息给库存服务// 库存服务本身也可以使用自动解锁模式 即使用消息队列R r = wareFeignService.orderLockStock(wareSkuLockVo);if(r.getCode() == 0) {// 锁成功response.setOrder(order.getOrder());// TODO 5 出异常
// int i = 10/0;// TODO 订单创建成功发送消息给MQrabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());return response;} else {// 锁定失败// 抛异常才能使事务回滚response.setCode(3);throw new NoStockException((String)r.get("msg"));// return response;}} else {response.setCode(2); // 金额对比失败return response;}}}
监听普通队列的方法
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {@AutowiredOrderService orderService;@RabbitHandlerpublic void lisentner(OrderEntity entity, Channel channel, Message message) throws IOException {try {orderService.closeOrder(entity);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}System.out.println("收到订单信息,即将关闭订单" + entity);}}
closeOrder方法如下:
@Overridepublic void closeOrder(OrderEntity entity) {OrderEntity orderEntity = this.getById(entity.getId());// 订单的状态需要是新创建的if(orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {//不能使用上面的orderEntity,然后直接更新状态进行更新,因为在创建过程中经历了那么多// 步骤,可能一些属性已经发生改变了。OrderEntity update = new OrderEntity();update.setId(orderEntity.getId());update.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(update);OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderEntity, orderTo);// 立即发送消息给库存通知其解锁rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);}}
因为封装到rabbitmq进行传递的消息不一样,所以库存的监听方法需要增加
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁** 2、订单失败* 库存锁定失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败** 该方法是处理库存自己发给自己的*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}/** 该方法是库存处理订单服务发消息队列的消息* 为什么会有这个步骤* 1、当订单服务如果卡顿,然后还没取消订单* 2、此时库存消息队列里的时间到了,库存监听到然后一看订单不是取消状态,所以直接更改库存的详情状态* 3、而过后订单服务反应过来了,取消了订单,但库存永远也回不去了,被锁死了*/@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo to, Message message, Channel channel) throws IOException {log.info("******订单关闭准备解锁库存******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}
}
unlockStock方法如下:
/** 防止订单服务卡顿,导致订单状态消息一直改不了,库存消息优先到期,查订单状态新建状态,什么都不做就走了* 导致卡顿的订单永远不能解锁库存*/@Overridepublic void unlockStock(OrderTo to) {String orderSn = to.getOrderSn();WareOrderTaskEntity taskEntity = orderTaskService.getOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));List<WareOrderTaskDetailEntity> entities = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", taskEntity.getId()).eq("lock_status", 1));for (WareOrderTaskDetailEntity entity : entities) {unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());}}