RocketMQ 实战:模拟电商网站场景综合案例(八)
一、RocketMQ 实战:模拟电商网站场景综合案例–下单异常问题演示
1.png
1、如果订单在扣减库存、扣减优惠券、扣减余额后,在 未 确认订单 前,出现了异常,如何处理异常问题。
2 失败补偿机制
2.1、消息发送方
- 配置 RocketMQ 属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
- 注入模板类和属性值信息
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${mq.order.topic}")private String topic;@Value("${mq.order.tag.cancel}")private String cancelTag;
- 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {//1.校验订单//2.生成预订try {//3.扣减库存//4.扣减优惠券//5.使用余额//6.确认订单} catch (Exception e) {//确认订单失败,发送消息CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();cancelOrderMQ.setOrderId(order.getOrderId());cancelOrderMQ.setCouponId(order.getCouponId());cancelOrderMQ.setGoodsId(order.getGoodsId());cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());cancelOrderMQ.setUserId(order.getUserId());cancelOrderMQ.setUserMoney(order.getMoneyPaid());try {sendMessage(topic, cancelTag, cancelOrderMQ.getOrderId().toString(), JSON.toJSONString(cancelOrderMQ));} catch (Exception e1) {e1.printStackTrace();CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);}return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());}
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {//判断Topic是否为空if (StringUtils.isEmpty(topic)) {CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);}//判断消息内容是否为空if (StringUtils.isEmpty(body)) {CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);}//消息体Message message = new Message(topic, tags, keys, body.getBytes());//发送消息rocketMQTemplate.getProducer().send(message);
}
2.2、消费接收方
- 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
- 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{@Overridepublic void onMessage(MessageExt messageExt) {...}
}
二、RocketMQ 实战:模拟电商网站场景综合案例-- 发送确认订单失败消息
1、在 shop-pojo 工程模块中,创建 发送消息的 POJO 类 MQEntity.java
/*** shop\shop-pojo\src\main\java\com\itheima\entity\MQEntity.java** 2024-6-11 创建 发送消息的 POJO 类 MQEntity.java*/
package com.itheima.entity;import java.math.BigDecimal;public class MQEntity {private Long orderId;private Long couponId;private Long userId;private BigDecimal userMoney;private Long goodsId;private Integer goodsNum;public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public Long getCouponId() {return couponId;}public void setCouponId(Long couponId) {this.couponId = couponId;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public BigDecimal getUserMoney() {return userMoney;}public void setUserMoney(BigDecimal userMoney) {this.userMoney = userMoney;}public Long getGoodsId() {return goodsId;}public void setGoodsId(Long goodsId) {this.goodsId = goodsId;}public Integer getGoodsNum() {return goodsNum;}public void setGoodsNum(Integer goodsNum) {this.goodsNum = goodsNum;}
}
2、在 shop-coupon-service 工程模块中,修改 订单实现类(下单业务)OrderServiceImpl.java 添加 发送订单确认失败消息 和 确认订单 两个方法。
/*** shop\shop-order-service\src\main\java\com\itheima\shop\service\OrderServiceImpl.java** 2024-6-11 创建 订单实现类(下单业务)OrderServiceImpl.java*/
package com.itheima.shop.service;import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.ICouponService;
import com.itheima.api.IGoodsService;
import com.itheima.api.IOrderService;
import com.itheima.api.IUserService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.MQEntity;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeOrderMapper;
import com.itheima.shop.pojo.*;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.math.BigDecimal;
import java.util.Date;@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {@Referenceprivate IGoodsService goodsService;@Referenceprivate IUserService userService;@Referenceprivate ICouponService couponService;@Value("${mq.order.topic}")private String topic;@Value("${mq.order.tag.cancel}")private String tag;@Autowiredprivate TradeOrderMapper orderMapper;@Autowiredprivate IDWorker idWorker;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic Result confirmOrder(TradeOrder order) {//1.校验订单checkOrder(order);//2.生成预订单Long orderId = savePreOrder(order);try {//3.扣减库存reduceGoodsNum(order);//4.扣减优惠券updateCouponStatus(order);//5.使用余额reduceMoneyPaid(order);//模拟异常抛出//CastException.cast(ShopCode.SHOP_FAIL);//6.确认订单updateOrderStatus(order);//7.返回成功状态return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());} catch (Exception e) {//1.确认订单失败,发送消息MQEntity mqEntity = new MQEntity();mqEntity.setOrderId(orderId);mqEntity.setUserId(order.getUserId());mqEntity.setUserMoney(order.getMoneyPaid());mqEntity.setGoodsId(order.getGoodsId());mqEntity.setGoodsNum(order.getGoodsNumber());mqEntity.setCouponId(order.getCouponId());//2.返回订单确认失败消息try {sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));} catch (Exception e1) {e1.printStackTrace();}return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());}}/*** 发送订单确认失败消息* @param topic* @param tag* @param keys* @param body*/private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {Message message = new Message(topic,tag,keys,body.getBytes());rocketMQTemplate.getProducer().send(message);}/*** 确认订单* @param order*/private void updateOrderStatus(TradeOrder order) {order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());order.setConfirmTime(new Date());int r = orderMapper.updateByPrimaryKey(order);if(r<=0){CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);}log.info("订单:"+order.getOrderId()+"确认订单成功");}/*** 扣减余额* @param order*/private void reduceMoneyPaid(TradeOrder order) {if(order.getMoneyPaid()!=null && order.getMoneyPaid().compareTo(BigDecimal.ZERO)==1){TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();userMoneyLog.setOrderId(order.getOrderId());userMoneyLog.setUserId(order.getUserId());userMoneyLog.setUseMoney(order.getMoneyPaid());userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());Result result = userService.updateMoneyPaid(userMoneyLog);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);}log.info("订单:"+order.getOrderId()+",扣减余额成功");}}/*** 使用优惠券* @param order*/private void updateCouponStatus(TradeOrder order) {if(order.getCouponId()!=null){TradeCoupon coupon = couponService.findOne(order.getCouponId());coupon.setOrderId(order.getOrderId());coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());coupon.setUsedTime(new Date());//更新优惠券状态Result result = couponService.updateCouponStatus(coupon);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);}log.info("订单:"+order.getOrderId()+",使用优惠券");}}/*** 扣减库存* @param order*/private void reduceGoodsNum(TradeOrder order) {TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();goodsNumberLog.setOrderId(order.getOrderId());goodsNumberLog.setGoodsId(order.getGoodsId());goodsNumberLog.setGoodsNumber(order.getGoodsNumber());Result result = goodsService.reduceGoodsNum(goodsNumberLog);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);}log.info("订单:"+order.getOrderId()+"扣减库存成功");}/*** 生成预订单** @param order* @return*/private Long savePreOrder(TradeOrder order) {//1. 设置订单状态为不可见order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());//2. 设置订单IDlong orderId = idWorker.nextId();order.setOrderId(orderId);//3. 核算订单运费BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());if(order.getShippingFee().compareTo(shippingFee)!=0){CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);}//4. 核算订单总金额是否合法BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));orderAmount.add(shippingFee);if(order.getOrderAmount().compareTo(orderAmount)!=0){CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);}//5.判断用户是否使用余额BigDecimal moneyPaid = order.getMoneyPaid();if(moneyPaid!=null){//5.1 订单中余额是否合法int r = moneyPaid.compareTo(BigDecimal.ZERO);//余额小于0if(r==-1){CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);}//余额大于0if(r==1){TradeUser user = userService.findOne(order.getUserId());if(moneyPaid.compareTo(new BigDecimal(user.getUserMoney()))==1){CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);}}}else{order.setMoneyPaid(BigDecimal.ZERO);}//6.判断用户是否使用优惠券Long couponId = order.getCouponId();if(couponId!=null){TradeCoupon coupon = couponService.findOne(couponId);//6.1 判断优惠券是否存在if(coupon==null){CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);}//6.2 判断优惠券是否已经被使用if(coupon.getIsUsed().intValue()==ShopCode.SHOP_COUPON_ISUSED.getCode().intValue()){CastException.cast(ShopCode.SHOP_COUPON_ISUSED);}order.setCouponPaid(coupon.getCouponPrice());}else{order.setCouponPaid(BigDecimal.ZERO);}//7.核算订单支付金额 订单总金额-余额-优惠券金额BigDecimal payAmount = order.getOrderAmount().subtract(order.getMoneyPaid()).subtract(order.getCouponPaid());order.setPayAmount(payAmount);//8.设置下单时间order.setAddTime(new Date());//9.保存订单到数据库orderMapper.insert(order);//10.返回订单IDreturn orderId;}/*** 核算运费* @param orderAmount* @return*/private BigDecimal calculateShippingFee(BigDecimal orderAmount) {if(orderAmount.compareTo(new BigDecimal(100))==1){return BigDecimal.ZERO;}else{return new BigDecimal(10);}}/*** 校验订单** @param order*/private void checkOrder(TradeOrder order) {//1.校验订单是否存在if (order == null) {CastException.cast(ShopCode.SHOP_ORDER_INVALID);}//2.校验订单中的商品是否存在TradeGoods goods = goodsService.findOne(order.getGoodsId());if (goods == null) {CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);}//3.校验下单用户是否存在TradeUser user = userService.findOne(order.getUserId());if (user == null) {CastException.cast(ShopCode.SHOP_USER_NO_EXIST);}//4.校验商品单价是否合法if (order.getGoodsPrice().compareTo(goods.getGoodsPrice()) != 0) {CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);}//5.校验订单商品数量是否合法if (order.getGoodsNumber() >= goods.getGoodsNumber()) {CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);}log.info("校验订单通过");}
}
三、RocketMQ 实战:模拟电商网站场景综合案例-- 发送确认订单失败消息演示
运行测试,查看消息发送是否成功。
2.png
四、RocketMQ 实战:模拟电商网站场景综合案例-- 消息消费方准备工作
1、在各 service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
1.1、在 shop-coupon-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
# shop\shop-coupon-service\src\main\resources\application.propertiesspring.application.name=dubbo-coupon-provider
spring.dubbo.application.id=dubbo-coupon-provider
spring.dubbo.application.name=dubbo-coupon-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20881# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
1.2、在 shop-goods-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
# shop\shop-goods-service\src\main\resources\application.propertiesspring.application.name=dubbo-goods-provider
spring.dubbo.application.id=dubbo-goods-provider
spring.dubbo.application.name=dubbo-goods-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20882# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
1.3、在 shop-order-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
# shop\shop-order-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-order-provider
spring.dubbo.application.id=dubbo-order-provider
spring.dubbo.application.name=dubbo-order-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20884# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
# 下单失败消息发送组
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.cancel=order_cancelmq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
1.4、在 shop-pay-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
# shop\shop-pay-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-pay-provider
spring.dubbo.application.id=dubbo-pay-provider
spring.dubbo.application.name=dubbo-pay-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20885# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=payProducerGroupmq.topic=payTopic
mq.pay.tag=paid
1.5、在 shop-user-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。
# shop\shop-user-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-user-provider
spring.dubbo.application.id=dubbo-user-provider
spring.dubbo.application.name=dubbo-user-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20883# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
2、在各 service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java 。
2.1、在 shop-coupon-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java
/*** shop\shop-coupon-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java** 2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelMQListener implements RocketMQListener<MessageExt> {}
2.2、在 shop-goods-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java
/*** shop\shop-goods-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java** 2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}
2.3、在 shop-order-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java
/*** shop\shop-order-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java** 2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}
2.4、在 shop-user-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java
/*** shop\shop-user-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java** 2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}
五、RocketMQ 实战:模拟电商网站场景综合案例-- 回退库存流程分析
六、RocketMQ 实战:模拟电商网站场景综合案例-- 回退库存 处理
1、回退库存 处理 – 消息消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Value("${mq.order.consumer.group.name}")private String groupName;@Autowiredprivate TradeGoodsMapper goodsMapper;@Autowiredprivate TradeMqConsumerLogMapper mqConsumerLogMapper;@Autowiredprivate TradeGoodsNumberLogMapper goodsNumberLogMapper;@Overridepublic void onMessage(MessageExt messageExt) {String msgId=null;String tags=null;String keys=null;String body=null;try {//1. 解析消息内容msgId = messageExt.getMsgId();tags= messageExt.getTags();keys= messageExt.getKeys();body= new String(messageExt.getBody(),"UTF-8");log.info("接受消息成功");//2. 查询消息消费记录TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog!=null){//3. 判断如果消费过...//3.1 获得消息处理状态Integer status = mqConsumerLog.getConsumerStatus();//处理过...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",已经处理过");return;}//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",正在处理");return;}//处理失败if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){//获得消息处理次数Integer times = mqConsumerLog.getConsumerTimes();if(times>3){log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");return;}mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());//使用数据库乐观锁更新TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());criteria.andGroupNameEqualTo(groupName);criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);if(r<=0){//未修改成功,其他线程并发修改log.info("并发修改,稍后处理");}}}else{//4. 判断如果没有消费过...mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(0);//将消息处理信息添加到数据库mqConsumerLogMapper.insert(mqConsumerLog);}//5. 回退库存MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);Long goodsId = mqEntity.getGoodsId();TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());goodsMapper.updateByPrimaryKey(goods);//记录库存操作日志TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();goodsNumberLog.setOrderId(mqEntity.getOrderId());goodsNumberLog.setGoodsId(goodsId);goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());goodsNumberLog.setLogTime(new Date());goodsNumberLogMapper.insert(goodsNumberLog);//6. 将消息的处理状态改为成功mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());mqConsumerLog.setConsumerTimestamp(new Date());mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);log.info("回退库存成功");} catch (Exception e) {e.printStackTrace();TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog==null){//数据库未有记录mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(1);mqConsumerLogMapper.insert(mqConsumerLog);}else{mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);}}}
}
2、在 shop-goods-service 工程模块中,完成 消费消息的 监听类 CancelMQListener.java 代码实现。
/*** shop\shop-goods-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java** 2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import com.alibaba.fastjson.JSON;
import com.itheima.constant.ShopCode;
import com.itheima.entity.MQEntity;
import com.itheima.shop.mapper.TradeGoodsMapper;
import com.itheima.shop.mapper.TradeGoodsNumberLogMapper;
import com.itheima.shop.mapper.TradeMqConsumerLogMapper;
import com.itheima.shop.pojo.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Date;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Value("${mq.order.consumer.group.name}")private String groupName;@Autowiredprivate TradeGoodsMapper goodsMapper;@Autowiredprivate TradeMqConsumerLogMapper mqConsumerLogMapper;@Autowiredprivate TradeGoodsNumberLogMapper goodsNumberLogMapper;@Overridepublic void onMessage(MessageExt messageExt) {String msgId=null;String tags=null;String keys=null;String body=null;try {//1. 解析消息内容msgId = messageExt.getMsgId();tags= messageExt.getTags();keys= messageExt.getKeys();body= new String(messageExt.getBody(),"UTF-8");log.info("接受消息成功");//2. 查询消息消费记录TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog!=null){//3. 判断如果消费过...//3.1 获得消息处理状态Integer status = mqConsumerLog.getConsumerStatus();//处理过...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",已经处理过");return;}//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",正在处理");return;}//处理失败if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){//获得消息处理次数Integer times = mqConsumerLog.getConsumerTimes();if(times>3){log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");return;}mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());//使用数据库乐观锁更新TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());criteria.andGroupNameEqualTo(groupName);criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);if(r<=0){//未修改成功,其他线程并发修改log.info("并发修改,稍后处理");}}}else{//4. 判断如果没有消费过...mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setGroupName(groupName);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(0);//将消息处理信息添加到数据库mqConsumerLogMapper.insert(mqConsumerLog);}//5. 回退库存MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);Long goodsId = mqEntity.getGoodsId();TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());goodsMapper.updateByPrimaryKey(goods);//6. 将消息的处理状态改为成功mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());mqConsumerLog.setConsumerTimestamp(new Date());mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);log.info("回退库存成功");} catch (Exception e) {e.printStackTrace();TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog==null){//数据库未有记录mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setGroupName(groupName);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(1);mqConsumerLogMapper.insert(mqConsumerLog);}else{mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);}}}
}
上一节关联链接请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(七)