# RocketMQ 实战:模拟电商网站场景综合案例(八)

RocketMQ 实战:模拟电商网站场景综合案例(八)

一、RocketMQ 实战:模拟电商网站场景综合案例–下单异常问题演示

1.png

1、如果订单在扣减库存、扣减优惠券、扣减余额后,在 未 确认订单 前,出现了异常,如何处理异常问题。

2 失败补偿机制

2.1、消息发送方
  • 配置 RocketMQ 属性值

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
  • 注入模板类和属性值信息
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${mq.order.topic}")private String topic;@Value("${mq.order.tag.cancel}")private String cancelTag;
  • 发送下单失败消息

@Override
public Result confirmOrder(TradeOrder order) {//1.校验订单//2.生成预订try {//3.扣减库存//4.扣减优惠券//5.使用余额//6.确认订单} catch (Exception e) {//确认订单失败,发送消息CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();cancelOrderMQ.setOrderId(order.getOrderId());cancelOrderMQ.setCouponId(order.getCouponId());cancelOrderMQ.setGoodsId(order.getGoodsId());cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());cancelOrderMQ.setUserId(order.getUserId());cancelOrderMQ.setUserMoney(order.getMoneyPaid());try {sendMessage(topic, cancelTag, cancelOrderMQ.getOrderId().toString(), JSON.toJSONString(cancelOrderMQ));} catch (Exception e1) {e1.printStackTrace();CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);}return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());}
}

private void sendMessage(String topic, String tags, String keys, String body) throws Exception {//判断Topic是否为空if (StringUtils.isEmpty(topic)) {CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);}//判断消息内容是否为空if (StringUtils.isEmpty(body)) {CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);}//消息体Message message = new Message(topic, tags, keys, body.getBytes());//发送消息rocketMQTemplate.getProducer().send(message);
}
2.2、消费接收方
  • 配置RocketMQ属性值

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
  • 创建监听类,消费消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{@Overridepublic void onMessage(MessageExt messageExt) {...}
}

二、RocketMQ 实战:模拟电商网站场景综合案例-- 发送确认订单失败消息

1、在 shop-pojo 工程模块中,创建 发送消息的 POJO 类 MQEntity.java


/***  shop\shop-pojo\src\main\java\com\itheima\entity\MQEntity.java**  2024-6-11  创建 发送消息的 POJO 类 MQEntity.java*/
package com.itheima.entity;import java.math.BigDecimal;public class MQEntity {private Long orderId;private Long couponId;private Long userId;private BigDecimal userMoney;private Long goodsId;private Integer goodsNum;public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public Long getCouponId() {return couponId;}public void setCouponId(Long couponId) {this.couponId = couponId;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public BigDecimal getUserMoney() {return userMoney;}public void setUserMoney(BigDecimal userMoney) {this.userMoney = userMoney;}public Long getGoodsId() {return goodsId;}public void setGoodsId(Long goodsId) {this.goodsId = goodsId;}public Integer getGoodsNum() {return goodsNum;}public void setGoodsNum(Integer goodsNum) {this.goodsNum = goodsNum;}
}

2、在 shop-coupon-service 工程模块中,修改 订单实现类(下单业务)OrderServiceImpl.java 添加 发送订单确认失败消息 和 确认订单 两个方法。


/***  shop\shop-order-service\src\main\java\com\itheima\shop\service\OrderServiceImpl.java**  2024-6-11  创建 订单实现类(下单业务)OrderServiceImpl.java*/
package com.itheima.shop.service;import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.ICouponService;
import com.itheima.api.IGoodsService;
import com.itheima.api.IOrderService;
import com.itheima.api.IUserService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.MQEntity;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeOrderMapper;
import com.itheima.shop.pojo.*;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.math.BigDecimal;
import java.util.Date;@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {@Referenceprivate IGoodsService goodsService;@Referenceprivate IUserService userService;@Referenceprivate ICouponService couponService;@Value("${mq.order.topic}")private String topic;@Value("${mq.order.tag.cancel}")private String tag;@Autowiredprivate TradeOrderMapper orderMapper;@Autowiredprivate IDWorker idWorker;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic Result confirmOrder(TradeOrder order) {//1.校验订单checkOrder(order);//2.生成预订单Long orderId = savePreOrder(order);try {//3.扣减库存reduceGoodsNum(order);//4.扣减优惠券updateCouponStatus(order);//5.使用余额reduceMoneyPaid(order);//模拟异常抛出//CastException.cast(ShopCode.SHOP_FAIL);//6.确认订单updateOrderStatus(order);//7.返回成功状态return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());} catch (Exception e) {//1.确认订单失败,发送消息MQEntity mqEntity = new MQEntity();mqEntity.setOrderId(orderId);mqEntity.setUserId(order.getUserId());mqEntity.setUserMoney(order.getMoneyPaid());mqEntity.setGoodsId(order.getGoodsId());mqEntity.setGoodsNum(order.getGoodsNumber());mqEntity.setCouponId(order.getCouponId());//2.返回订单确认失败消息try {sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));} catch (Exception e1) {e1.printStackTrace();}return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());}}/*** 发送订单确认失败消息* @param topic* @param tag* @param keys* @param body*/private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {Message message = new Message(topic,tag,keys,body.getBytes());rocketMQTemplate.getProducer().send(message);}/*** 确认订单* @param order*/private void updateOrderStatus(TradeOrder order) {order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());order.setConfirmTime(new Date());int r = orderMapper.updateByPrimaryKey(order);if(r<=0){CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);}log.info("订单:"+order.getOrderId()+"确认订单成功");}/*** 扣减余额* @param order*/private void reduceMoneyPaid(TradeOrder order) {if(order.getMoneyPaid()!=null && order.getMoneyPaid().compareTo(BigDecimal.ZERO)==1){TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();userMoneyLog.setOrderId(order.getOrderId());userMoneyLog.setUserId(order.getUserId());userMoneyLog.setUseMoney(order.getMoneyPaid());userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());Result result = userService.updateMoneyPaid(userMoneyLog);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);}log.info("订单:"+order.getOrderId()+",扣减余额成功");}}/*** 使用优惠券* @param order*/private void updateCouponStatus(TradeOrder order) {if(order.getCouponId()!=null){TradeCoupon coupon = couponService.findOne(order.getCouponId());coupon.setOrderId(order.getOrderId());coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());coupon.setUsedTime(new Date());//更新优惠券状态Result result =  couponService.updateCouponStatus(coupon);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);}log.info("订单:"+order.getOrderId()+",使用优惠券");}}/*** 扣减库存* @param order*/private void reduceGoodsNum(TradeOrder order) {TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();goodsNumberLog.setOrderId(order.getOrderId());goodsNumberLog.setGoodsId(order.getGoodsId());goodsNumberLog.setGoodsNumber(order.getGoodsNumber());Result result = goodsService.reduceGoodsNum(goodsNumberLog);if(result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())){CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);}log.info("订单:"+order.getOrderId()+"扣减库存成功");}/*** 生成预订单** @param order* @return*/private Long savePreOrder(TradeOrder order) {//1. 设置订单状态为不可见order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());//2. 设置订单IDlong orderId = idWorker.nextId();order.setOrderId(orderId);//3. 核算订单运费BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());if(order.getShippingFee().compareTo(shippingFee)!=0){CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);}//4. 核算订单总金额是否合法BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));orderAmount.add(shippingFee);if(order.getOrderAmount().compareTo(orderAmount)!=0){CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);}//5.判断用户是否使用余额BigDecimal moneyPaid = order.getMoneyPaid();if(moneyPaid!=null){//5.1 订单中余额是否合法int r = moneyPaid.compareTo(BigDecimal.ZERO);//余额小于0if(r==-1){CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);}//余额大于0if(r==1){TradeUser user = userService.findOne(order.getUserId());if(moneyPaid.compareTo(new BigDecimal(user.getUserMoney()))==1){CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);}}}else{order.setMoneyPaid(BigDecimal.ZERO);}//6.判断用户是否使用优惠券Long couponId = order.getCouponId();if(couponId!=null){TradeCoupon coupon = couponService.findOne(couponId);//6.1 判断优惠券是否存在if(coupon==null){CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);}//6.2 判断优惠券是否已经被使用if(coupon.getIsUsed().intValue()==ShopCode.SHOP_COUPON_ISUSED.getCode().intValue()){CastException.cast(ShopCode.SHOP_COUPON_ISUSED);}order.setCouponPaid(coupon.getCouponPrice());}else{order.setCouponPaid(BigDecimal.ZERO);}//7.核算订单支付金额    订单总金额-余额-优惠券金额BigDecimal payAmount = order.getOrderAmount().subtract(order.getMoneyPaid()).subtract(order.getCouponPaid());order.setPayAmount(payAmount);//8.设置下单时间order.setAddTime(new Date());//9.保存订单到数据库orderMapper.insert(order);//10.返回订单IDreturn orderId;}/*** 核算运费* @param orderAmount* @return*/private BigDecimal calculateShippingFee(BigDecimal orderAmount) {if(orderAmount.compareTo(new BigDecimal(100))==1){return BigDecimal.ZERO;}else{return new BigDecimal(10);}}/*** 校验订单** @param order*/private void checkOrder(TradeOrder order) {//1.校验订单是否存在if (order == null) {CastException.cast(ShopCode.SHOP_ORDER_INVALID);}//2.校验订单中的商品是否存在TradeGoods goods = goodsService.findOne(order.getGoodsId());if (goods == null) {CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);}//3.校验下单用户是否存在TradeUser user = userService.findOne(order.getUserId());if (user == null) {CastException.cast(ShopCode.SHOP_USER_NO_EXIST);}//4.校验商品单价是否合法if (order.getGoodsPrice().compareTo(goods.getGoodsPrice()) != 0) {CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);}//5.校验订单商品数量是否合法if (order.getGoodsNumber() >= goods.getGoodsNumber()) {CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);}log.info("校验订单通过");}
}

三、RocketMQ 实战:模拟电商网站场景综合案例-- 发送确认订单失败消息演示

运行测试,查看消息发送是否成功。

2.png

四、RocketMQ 实战:模拟电商网站场景综合案例-- 消息消费方准备工作

1、在各 service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

1.1、在 shop-coupon-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

# shop\shop-coupon-service\src\main\resources\application.propertiesspring.application.name=dubbo-coupon-provider
spring.dubbo.application.id=dubbo-coupon-provider
spring.dubbo.application.name=dubbo-coupon-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20881# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
1.2、在 shop-goods-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

#  shop\shop-goods-service\src\main\resources\application.propertiesspring.application.name=dubbo-goods-provider
spring.dubbo.application.id=dubbo-goods-provider
spring.dubbo.application.name=dubbo-goods-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20882# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
1.3、在 shop-order-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

#  shop\shop-order-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-order-provider
spring.dubbo.application.id=dubbo-order-provider
spring.dubbo.application.name=dubbo-order-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20884# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
# 下单失败消息发送组
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.cancel=order_cancelmq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
1.4、在 shop-pay-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

#  shop\shop-pay-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-pay-provider
spring.dubbo.application.id=dubbo-pay-provider
spring.dubbo.application.name=dubbo-pay-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20885# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=payProducerGroupmq.topic=payTopic
mq.pay.tag=paid
1.5、在 shop-user-service 工程模块中,修改 application.properties 配置文件,添加 配置 RocketMQ 属性值。

#  shop\shop-user-service\src\main\resources\application.properties# dubbo
spring.application.name=dubbo-user-provider
spring.dubbo.application.id=dubbo-user-provider
spring.dubbo.application.name=dubbo-user-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20883# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root#spring集成Mybatis环境
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic

2、在各 service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java 。

2.1、在 shop-coupon-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java

/***   shop\shop-coupon-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java**   2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelMQListener implements RocketMQListener<MessageExt> {}
2.2、在 shop-goods-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java

/***   shop\shop-goods-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java**   2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}
2.3、在 shop-order-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java

/***   shop\shop-order-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java**   2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}
2.4、在 shop-user-service 工程模块中,创建 消费消息的 监听类 CancelMQListener.java

/***   shop\shop-user-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java**   2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{}

五、RocketMQ 实战:模拟电商网站场景综合案例-- 回退库存流程分析

在这里插入图片描述

六、RocketMQ 实战:模拟电商网站场景综合案例-- 回退库存 处理

1、回退库存 处理 – 消息消费者


@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Value("${mq.order.consumer.group.name}")private String groupName;@Autowiredprivate TradeGoodsMapper goodsMapper;@Autowiredprivate TradeMqConsumerLogMapper mqConsumerLogMapper;@Autowiredprivate TradeGoodsNumberLogMapper goodsNumberLogMapper;@Overridepublic void onMessage(MessageExt messageExt) {String msgId=null;String tags=null;String keys=null;String body=null;try {//1. 解析消息内容msgId = messageExt.getMsgId();tags= messageExt.getTags();keys= messageExt.getKeys();body= new String(messageExt.getBody(),"UTF-8");log.info("接受消息成功");//2. 查询消息消费记录TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog!=null){//3. 判断如果消费过...//3.1 获得消息处理状态Integer status = mqConsumerLog.getConsumerStatus();//处理过...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",已经处理过");return;}//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",正在处理");return;}//处理失败if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){//获得消息处理次数Integer times = mqConsumerLog.getConsumerTimes();if(times>3){log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");return;}mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());//使用数据库乐观锁更新TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());criteria.andGroupNameEqualTo(groupName);criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);if(r<=0){//未修改成功,其他线程并发修改log.info("并发修改,稍后处理");}}}else{//4. 判断如果没有消费过...mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(0);//将消息处理信息添加到数据库mqConsumerLogMapper.insert(mqConsumerLog);}//5. 回退库存MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);Long goodsId = mqEntity.getGoodsId();TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());goodsMapper.updateByPrimaryKey(goods);//记录库存操作日志TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();goodsNumberLog.setOrderId(mqEntity.getOrderId());goodsNumberLog.setGoodsId(goodsId);goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());goodsNumberLog.setLogTime(new Date());goodsNumberLogMapper.insert(goodsNumberLog);//6. 将消息的处理状态改为成功mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());mqConsumerLog.setConsumerTimestamp(new Date());mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);log.info("回退库存成功");} catch (Exception e) {e.printStackTrace();TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog==null){//数据库未有记录mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(1);mqConsumerLogMapper.insert(mqConsumerLog);}else{mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);}}}
}

2、在 shop-goods-service 工程模块中,完成 消费消息的 监听类 CancelMQListener.java 代码实现。


/***   shop\shop-goods-service\src\main\java\com\itheima\shop\mq\CancelMQListener.java**   2024-6-11 创建 消费消息的 监听类 CancelMQListener.java*/
package com.itheima.shop.mq;import com.alibaba.fastjson.JSON;
import com.itheima.constant.ShopCode;
import com.itheima.entity.MQEntity;
import com.itheima.shop.mapper.TradeGoodsMapper;
import com.itheima.shop.mapper.TradeGoodsNumberLogMapper;
import com.itheima.shop.mapper.TradeMqConsumerLogMapper;
import com.itheima.shop.pojo.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Date;@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Value("${mq.order.consumer.group.name}")private String groupName;@Autowiredprivate TradeGoodsMapper goodsMapper;@Autowiredprivate TradeMqConsumerLogMapper mqConsumerLogMapper;@Autowiredprivate TradeGoodsNumberLogMapper goodsNumberLogMapper;@Overridepublic void onMessage(MessageExt messageExt) {String msgId=null;String tags=null;String keys=null;String body=null;try {//1. 解析消息内容msgId = messageExt.getMsgId();tags= messageExt.getTags();keys= messageExt.getKeys();body= new String(messageExt.getBody(),"UTF-8");log.info("接受消息成功");//2. 查询消息消费记录TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog!=null){//3. 判断如果消费过...//3.1 获得消息处理状态Integer status = mqConsumerLog.getConsumerStatus();//处理过...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",已经处理过");return;}//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",正在处理");return;}//处理失败if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){//获得消息处理次数Integer times = mqConsumerLog.getConsumerTimes();if(times>3){log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");return;}mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());//使用数据库乐观锁更新TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());criteria.andGroupNameEqualTo(groupName);criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);if(r<=0){//未修改成功,其他线程并发修改log.info("并发修改,稍后处理");}}}else{//4. 判断如果没有消费过...mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setGroupName(groupName);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(0);//将消息处理信息添加到数据库mqConsumerLogMapper.insert(mqConsumerLog);}//5. 回退库存MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);Long goodsId = mqEntity.getGoodsId();TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());goodsMapper.updateByPrimaryKey(goods);//6. 将消息的处理状态改为成功mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());mqConsumerLog.setConsumerTimestamp(new Date());mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);log.info("回退库存成功");} catch (Exception e) {e.printStackTrace();TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog==null){//数据库未有记录mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setGroupName(groupName);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(1);mqConsumerLogMapper.insert(mqConsumerLog);}else{mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);}}}
}

上一节关联链接请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(七)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/347410.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue+element el-select动态加减框数量及验证下拉框选项动态置灰(选中行的下拉框换个值后,原值没办法监控这个问题也解决了)

1效果: 2部分主要(HTML): 1:这个位置主要就是看看方法什么的吧,还有大概的结构 2:change"sort_Change(item,tablelists.orderbyList)这两个参数(都有大用): (1)item:代表每次你操作的这个数据 (2)tablelists.orderbyList:代表你这一共有几行数据(上边这个例子就会得到一个…

LCD电子广告牌课程设计

概述 1.1课程设计简介 亮丽实用的广告牌可以给我们的生活添加光彩、可以给店铺招揽生意。传统的广告牌都是固定的汉字&#xff0c;并且时间长了会掉色&#xff0c;使汉字模糊难认&#xff0c;这就给我的生活带来很多的不便。尤其到了晚上传统广告牌就会失去其该有的作用。所以在…

2024年安全现状报告

2024 年安全现状报告有些矛盾。尽管安全专业人员的道路困难重重&#xff0c;比如说严格的合规要求、不断升级的地缘政治紧张局势和更复杂的威胁环境&#xff0c;但整个行业还是在取得进展。 许多组织表示&#xff0c;与前几年相比&#xff0c;网络安全变得更容易管理。组织之间…

AI赋能银行国际结算审单:合合信息抽取技术的实践与应用

官.网地址&#xff1a;合合TextIn - 合合信息旗下OCR云服务产品 时下&#xff0c;银行国际业务是金融体系的重要组成部分&#xff0c;涵盖了外汇交易、国际结算、贸易融资、跨境投资等领域&#xff0c;这些业务对于国际贸易和全球经济发展具有重要作用。国际业务部门单据、凭证…

OpenGL系列(五)纹理贴图

概述 OpenGL纹理是一种在三维图形中应用纹理映射的技术。纹理是一张图像&#xff0c;可以应用到三维模型的表面上&#xff0c;从而使得模型看起来更加真实和具有细节。通过纹理映射&#xff0c;可以将图像的像素值与三维模型的顶点进行匹配&#xff0c;从而为模型的表面增加细节…

验证码识别接口、多种样式验证码识别接口、中英文验证码识别接口

验证码识别接口、多种样式验证码识别接口、中英文验证码识别接口 本文提供一个基于OCR和机器学习的验证码识别接口&#xff0c;能够识别较复杂的中文、英文验证码&#xff0c;在OCR的基础上针对验证码进行算法优化。本接口是收费的&#xff08;最低0.5分1次调用&#xff0c;试…

23种设计模式之代理模式

代理模式 1、概念 代理模式&#xff1a;给某一个对象提供一个代理或占位符&#xff0c;并由代理对象来控制对原对象的访问 代理模式是常用的结构型设计模式之一&#xff0c;在Java RMI、Web Service、Spring AOP等技术和框架中都使用了代理模式 2、代理模式结构 Subject&a…

解析 Spring 框架中的三种 BeanName 生成策略

在 Spring 框架中&#xff0c;定义 Bean 时不一定需要指定名称&#xff0c;Spring 会智能生成默认名称。本文将介绍 Spring 的三种 BeanName 生成器&#xff0c;包括在 XML 配置、Java 注解和组件扫描中使用的情况&#xff0c;并解释它们如何自动创建和管理 Bean 名称。 1. Be…

Nodejs 第七十七章(MQ高级)

MQ介绍和基本使用在75章介绍过了&#xff0c;不再重复 MQ高级用法-延时消息 什么是延时消息? Producer 将消息发送到 MQ 服务端&#xff0c;但并不期望这条消息立马投递&#xff0c;而是延迟一定时间后才投递到 Consumer 进行消费&#xff0c;该消息即延时消息 插件安装 R…

业务安全蓝军测评标准解读—业务安全体系化

目录 1.前言 2.业务蓝军测评标准 2.1 业务安全脆弱性评分(ISVS) 2.2 ISVS评分的参考意义 2.3 纵向对比 2.4 横向对比 3.业务蓝军测评案例 3.1 APP虚假安装案例 3.1.1 定义测评对象和攻击目标 3.1.2 制定攻击方案 3.1.3 执行攻击并评估 3.2 人脸识别绕过案例 3.2.…

STM32硬件接口I2C应用(基于MP6050)

目录 概述 1 STM32Cube控制配置I2C 1.1 I2C参数配置 1.2 使用STM32Cube产生工程 2 HAL库函数介绍 2.1 初始化函数 2.2 写数据函数 2.3 读数据函数 3 认识MP6050 3.1 MP6050功能介绍 3.2 加速计测量寄存器 ​编辑3.3 温度计量寄存器 3.4 陀螺仪测量寄存器 4 MP60…

微信小程序组件传值

虽然微信小程序是比较轻量的&#xff0c;但是还是拥有组件的 这是文件的基本目录 我们的代码基本都在pages和components文件夹中 在component中创建组件 在component中 &#xff0c;创建一个目录 我创建了一个 head目录 用于配置头部信息 我在这里创建了 一个头部组件&…

linux-计划任务

作用&#xff1a;定时自动完成特定的工作 计划任务的分类 一次性的计划任务&#xff1a;例如下周三对文档的重要文件备份一次 周期性的计划任务&#xff1a;每天12:00创建一个文件 命令 一次性的任务计划 at batch 周期性计划任务 crontab anacron 一次性计划任务 …

Github 2024-06-12 C开源项目日报 Top10

根据Github Trendings的统计,今日(2024-06-12统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量C项目10PHP项目1PLpgSQL项目1C++项目1Ventoy: 100%开源的可启动USB解决方案 创建周期:1534 天开发语言:C协议类型:GNU General Public Licen…

品牌与产品:消费者决策的经济逻辑与品牌宣传的战略意义

在当今日益全球化的经济环境中&#xff0c;品牌与产品之间的关系对于企业的成功与否起着至关重要的作用。然而&#xff0c;在消费者做出购买决策时&#xff0c;他们到底是在选择产品本身&#xff0c;还是在选择附着在产品之上的品牌价值&#xff1f;同样&#xff0c;当客户选择…

载波相移CPS-SPWM调制方法的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 载波相移CPS-SPWM调制方法的simulink建模与仿真&#xff0c;载波相移PWM方法&#xff1a; 2.系统仿真结果 单极倍频 釆用 调制波 反相 法 &#xff0c; 基本调制原理为 &…

【总线】设计fpga系统时,为什么要使用总线?

目录 为什么用总线 为什么选择AMBA 总结 系列文章 【总线】AMBA总线架构的发展历程-CSDN博客 【总线】设计fpga系统时&#xff0c;为什么要使用总线&#xff1f;-CSDN博客 为什么用总线 在FPGA系统设计中&#xff0c;使用总线是为了实现组件间的高效互联与通信&#xff0c…

惠州惠城:可燃气体报警器定期校准检测,安全更放心

在惠州惠城这片繁华的土地上&#xff0c;工业发展日新月异&#xff0c;安全问题愈发受到重视。其中&#xff0c;可燃气体报警器作为预防火灾和爆炸事故的重要设备&#xff0c;正在越来越多的场所得到应用。 今天&#xff0c;佰德就来探讨一下可燃气体报警器在惠州惠城的重要性…

3.2 窗口滚动条

本节讲述窗口滚动条的简单使用方法。如果窗口客户区的内容太多&#xff0c;为了方便浏览窗口客户区的所有内容&#xff0c;就需要在创建窗口时添加窗口垂直或水平滚动条样式。窗口过程处理WM_CREATE消息时初始化滚动条的位置和滚动范围。窗口过程处理WM_VSCROLL或WM_HSCROLL消息…

idea自定义注释模板

1、打开配置 setting -> Editor -> Live Template 2、添加TemplateGroup&#xff0c;并在添加的TemplateGroup下加LiveTemplate 3、配置Live Template 内容&#xff1a; **** Description: * $param$* return $return$ * author $user$* date $date$ $time$**/变量…