项目场景:
在项目开发中常常会遇到在一个有数据库操作的方法中,发送MQ消息,如果这种情况消息队列效率比较快,就会出现数据库事务还没提交,消息队列已经执行业务,导致不一致问题。举个应用场景,我们提交一个订单,将流水号放在MQ里,MQ监听到后就会查询订单去做其它业务,如果这时候数据库事务还没提交,也就是没生成订单流水,MQ监听到消息就去执行业务,查询订单,肯定会出现业务不一致问题
问题描述
最近遇到一个业务场景,类似于下单过程,场景是用户注册消息,注册成功后,会发送MQ消息,MQ监听到消息后,会查询用户的信息,如何再做其它业务,但是遇到一个问题,就是mq消费消息的速度是快于数据库事务提交的,就是我们用户注册的信息还没写入数据库,mq已经提前消费了,所以会导致查询不到用户注册的信息
大致的代码:
@Transactional(rollbackFor = Exception.class)
public void register(){User user = User.builder().name("管理员").email("123456@qq.com").build();userMapper.insert(user);// 发送消息给MQsendMQMessage();
}
原因分析
MQ消息消费快于事务提交
解决方案
对于这种情况,下面给出两种处理方法,一种是借助于Spring框架提供的TransactionSynchronizationManager
来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener
来控制事务
- TransactionSynchronizationManager控制事务
@Transactional(rollbackFor = Exception.class)
public void register() {User user = User.builder().name("管理员").email("123456@qq.com").build();userMapper.insert(user);// after transaction commitTransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {// 发送消息给MQsendMQMessage();}});}
测试一下,通过日志可以看出事务已经提交了,然后发送mq,mq监听到消息,就会去读取用户信息,是可以获取到的
- @TransactionalEventListener控制事务
如果借助Spring框架提供的事件监听机制来实现,就需要用到@TransactionalEventListener
监听器,下面给出例子
创建一个Event,主要来做参数传送
package com.example.eventlistener.event;import org.springframework.context.ApplicationEvent;public class SendMsgEvent extends ApplicationEvent {private Long userId;private String userName;public SendMsgEvent(Object source){super(source);}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}
}
创建一个监听器,注意要加上@Component
,组件类才能被Spring容器管理
package com.example.eventlistener.listener;import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;import javax.annotation.Resource;@Component
@Slf4j
public class SendMsgListener {@Resourceprivate UserMapper userMapper;@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)public void sendMsg(SendMsgEvent sendMsgEvent) {log.info("sendMsg: {}" , JSONUtil.toJsonStr(sendMsgEvent));// 发送消息给MQsendMQMessage();}
}
业务类实现业务:
package com.example.eventlistener.service.impl;import cn.hutool.http.HttpRequest;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.event.UserRegisterEvent;
import com.example.eventlistener.mapper.UserMapper;
import com.example.eventlistener.model.User;
import com.example.eventlistener.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;import javax.annotation.Resource;@Service
@Slf4j
public class UserServiceImpl implements ApplicationEventPublisherAware , IUserService {private ApplicationEventPublisher applicationEventPublisher;@Resourceprivate UserMapper userMapper;@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.applicationEventPublisher = applicationEventPublisher;}@Override@Transactional(rollbackFor = Exception.class)public void sendMsgAfterRegisterByEvent() {User user = doRegister();// after transaction commitSendMsgEvent sendMsgEvent = new SendMsgEvent(this);sendMsgEvent.setUserId(user.getId());sendMsgEvent.setUserName(user.getName());applicationEventPublisher.publishEvent(sendMsgEvent);}private User doRegister() {User user = User.builder().name("管理员").email("123456@qq.com").build();userMapper.insert(user);log.info("save user info");return user;}}
经过测试,也可以实现同样的效果,控制数据库的事务提交后,才执行发送MQ消息
补充:
如果执行出现java.lang.IllegalStateException: Transaction synchronization is not active
,说明没加事务控制,加上@Transactional(rollbackFor = Exception.class)
即可