问题描述:
先说一下流程:后端保存前端提交的图表信息,然后发送异步消息到消息队列,由下游服务去处理图表信息。
部署项目到服务器,验证项目功能的时候,出现了以下错误:数据库存在数据。下游服务查不到数据库的数据
// service代码
@Override
@Retryable
@Transactional
public ChartVo genChartByAiAsyncMq(Long uid, MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest) {// ...省略Chart chart = Chart.builder().name(name).goal(goal).chartData(data).chartType(chartType).uid(uid).status("wait").build();boolean save = this.save(chart);if(!save){log.info("保存表单失败");throw new RuntimeException("保存表单失败");}List<Chart> list = this.list();log.info("chart长度:{}", list.size());// 发送消息,触发异步处理biMessageProducer.sendMessage(String.valueOf(chart.getId()));log.info("发送消息成功");// 省略
}
下游服务处理代码
@SneakyThrows
@RabbitListener(queues = {MQConstants.BI_QUEUE_NAME}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){log.info("receive message: {}", message);// 检查消息是否为空if(StringUtils.isBlank(message)){log.info("消息不能为空");channel.basicNack(deliveryTag, false, false);throw new RuntimeException("消息不能为空");}List<Chart> list = biService.list();log.info("图表列表长度:{}", list.size());// 尝试将消息解析为图表ID,并查询图表信息Long chartId = Long.parseLong(message);Chart chart = biService.getById(chartId);log.info("图表信息:{}", chart);// 图表不存在时的处理if(chart == null){log.info("图表不存在");channel.basicNack(deliveryTag, false, false);throw new RuntimeException("图表不存在");}// 省略
}
日志输出如下:
数据库信息:
解决过程
首先说明一下,这个错误之前没有出现过,下午出错,再次测试的时候,也会出现正常的情况,只不过错误占比有点高(10次有6次获取不到数据库消息)。
分析的过程:
步骤1、首先在上游服务和下游服务打印日志,查看数据库有多少条数据,上游服务显示有2条数据,下游服务显示有1条数据
步骤2、找错的时候,看见方法加了事务注解@Transactional,这个时候想到可能是事务影响(后面分析原因),然后取掉注解,重新验证,发现没有出错
原因分析
为什么会出现这个错误呢?
我觉得是由MySQL的事务和网络引起的,
MySQL事务+网络
我们都知道MySQL(8.x版本)的事务的隔离级别默认是可重复读(RR),那么一个事务在操作完成之前,对其他事务是不看见的,所以就说,方法中先保存图表信息到数据库,然后发送消息到消息队列,再执行方法的后续过程。发送消息到队列之后,可能数据库事务还没有提交,但是消息发送成功了,就立刻被消费者端消费,此时,消费者端查询数据库中的图表信息,当然查不到,因为生产者端的事务还没有提交。
之前没有出错,这次验证出错分析
服务端将消息发送消息队列,由于网络有延迟,导致事务提交之后,消息才被消费端消费。
解决方法
1、手动提交事务,不使用注解
2、设置延迟队列,但是这个延迟的时间具体是多少,我们无法确定,所以最后采用第一种方法解决此问题。
@Slf4j
@Service
public class BiServiceImpl extends ServiceImpl<ChartMapper, Chart> implements BiService {@Resourceprivate PlatformTransactionManager transactionManager;@Override@Retryablepublic ChartVo genChartByAiAsyncMq(Long uid, MultipartFile multipartFile, GenChartByAiRequest genChartByAiRequest) {TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try{// 省略...Chart chart = Chart.builder().name(name).goal(goal).chartData(data).chartType(chartType).uid(uid).status("wait").build();boolean save = this.save(chart);if(!save){log.info("保存表单失败");throw new RuntimeException("保存表单失败");}transactionManager.commit(transactionStatus);List<Chart> list = this.list();log.info("chart长度:{}", list.size());// 发送消息,触发异步处理biMessageProducer.sendMessage(String.valueOf(chart.getId()));log.info("发送消息成功");// 省略...}catch (Exception e){log.error("AI 异步调用失败", e);transactionManager.rollback(transactionStatus);throw new RuntimeException("AI 异步调用失败");}}
}