本文将记录和分析日志中的ConcurrentModificationException关键字报警,还有一些我的思考,希望对大家有帮助。
一、背景
近期,在日常的日志关键字报警分析时,发现我负责的一个电商核心系统在某时段存在较多ConcurrentModificationException异常日志,遂进行分析和改进,下面是我的一些思考。
1.1 系统架构
一直以来,无状态的服务都被当作分布式服务设计的最佳实践。因为无状态的服务对于扩展性和运维方面有着得天独厚的优势,可以随意地增加和减少节点。本系统的整体架构可以认为是由一个MQ应用、一个RPC应用和底层存储组成。
RPC应用是无状态服务,对外提供常用的查询和操作接口;采用双机房部署,每个机房10*8C16G;
MQ应用是无状态服务,负责消费MQ消息,在消费过程中会调用该RPC应用提供方法;采用双机房部署,每个机房5*8C16G;
底层存储用的是数据库集群和缓存集群,大概如图所示:
1.2 关键代码
MyRpcService
对外提供RPC服务,getList
方法可以根据入参中的状态进行查询,由于业务需要,需要对入参的状态进行排序,实现部分关键代码如下:
public class MyRpcServiceImpl implements MyRpcService{@Overridepublic BaseResult getList(ListParam listParam) {BaseResult baseResult = new BaseResult();List<Integer> states = listParam.getStateList();// 省略大段代码KeyUtil.getKeyString(states);// 省略大段代码baseResult.setSuccess(true);return baseResult;}}
KeyUtil
是一个工具类,getKeyString
方法对入参的itemList
进行排序使用的是Java集合框架内置的sort 方法,代码如下:
public class KeyUtil {public static String getKeyString(List<Integer> itemList) {String result = "";//省略代码Collections.sort(itemList);//省略代码return result;}}
MyMqConsumer
是MQ消费者,负责监听消息进行消费。在消费逻辑中,会调用MyRpcService
的getList()
方法进行状态查询,因为查询的状态是固定的,所以在Consumer
类中定义了static final
类型的stateList
,关键代码如下:
public class MyMqConsumer implements MessageListener{public static final List<Integer> stateList = Stream.of(1).collect(Collectors.toList());@Resourceprivate MyRpcService myRpcService;@Overridepublic void onMessage(List<Message> messageList) {// 省略代码for (Message message : messageList) {// 省略其他代码ListParam listParam = new ListParam();listParam.setStateList(stateList);BaseResult result = myRpcService.getList(listParam);// 省略其他代码}}}
二、 原因分析
看了上面的系统架构和关键代码,不知道你有没有发现问题?可以先抛开设计和代码实现方面的问题不谈,只看这样的代码能不能正常执行,得到正确的业务结果。
既然这么问了,当然会有问题:在高并发环境下,MQ应用在消费消息时,调用RPC服务查询时可能会抛出异常,从而触发MQ异常重试,至于对业务有没有影响,得具体问题具体分析了。
ERROR 执行流程时出错
java.util.ConcurrentModificationException:null
at java.util.ArrayList.forEach(ArrayList.java:1260)~[:?1.8.0_192]
at com.shangguan.test.util.KeyUtil.getKeyString(KeyUtil.java:10)
...
2.1 分析1-ArrayList源码
从日志中可以看到,ConcurrentModificationException
是java.util.ArrayList
类里面的forEach
方法抛出来的,源码如下:
@Overridepublic void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);final int expectedModCount = modCount;@SuppressWarnings("unchecked")final E[] elementData = (E[]) this.elementData;final int size = this.size;for (int i=0; modCount == expectedModCount && i < size; i++) {action.accept(elementData[i]);}if (modCount != expectedModCount) {throw new ConcurrentModificationException();}}
在该方法中,内部会维护一个expectedModCount
变量,赋值为modCount
,在每次迭代过程中,迭代器会检查expectedModCount
是否等于当前的modCount
。如果不等,说明在迭代过程中ArrayList
的结构发生了修改,迭代器会抛出ConcurrentModificationException
异常。这种设计可以确保在多线程环境下,当一个线程修改ArrayList
时,其他线程在迭代过程中可以立即发现这种修改,从而避免潜在的数据不一致问题。
再可以看下源码中modCount
的注释,大意是:
modCount
表示ArrayList
自从创建以来结构上发生的修改次数。结构修改是指改变列表大小的修改,或者以其他方式扰乱列表,使正在进行的迭代可能产生不正确的结果。
modCount
字段用于iterator
和listIterator
方法返回的迭代器(或列表迭代器)。如果这个字段的值在迭代过程中发生意外的变化,迭代器(或列表迭代器)将在next、remove、previous、set或add
操作时抛出ConcurrentModificationException
异常。这提供了fail-fast(快速失败)行为,而不是在迭代过程中遇到并发修改时具有不确定性。
子类可以选择使用这个字段。如果子类希望提供fail-fast迭代器(和列表迭代器),那么它只需在其add(int, E)
和remove(int)
方法(以及覆盖的任何其他导致列表结构修改的方法)中递增此字段。单次调用add(int, E)
或remove(int)
应该在此字段上增加不超过1次,否则迭代器(和列表迭代器)将抛出虚假的ConcurrentModificationException
。如果实现不希望提供fail-fast迭代器,可以忽略此字段。
2.2 分析2-线程安全问题
有个有趣的现象是,这个异常日志仅存在MQ应用中,这是为什么呢?
这其实是一个多线程问题。我们知道,static对象是在类加载时创建的全局对象,它们的生命周期与类的生命周期相同。static对象在程序启动时创建,在程序结束时销毁。这意味着static对象在多个线程之间共享的,可能存在线程安全问题。
翻回去仔细看下代码,可以看到MyMqConsumer
定义的stateList是static类型的,是否是否存在线程安全问题呢?
在流量较低的情况下,多个消息不在同一时刻到达,每个线程处理消息将不会争夺static对象,所以不会有问题;
当流量较大情况下,有多个消息可能在同一时刻到达,每个线程处理过程中都会对stateList进行赋值,调用远程RPC接口,它们之间将会争夺static对象,可能存在问题。例如上图中右半部分,线程1还没有处理完消息1时,线程2就开始争抢,那么就可能使ArrayList中modCount != expectedModCount条件满足,从而抛出异常。
三、改进思考
3.1 本问题的优化
经过上述分析,已经清楚问题的产生原因了。对于本问题的优化,其实也比较简单。有如下两种方式可供选择:
-
在
MyMqConsumer
调用RPC查询的入参,使用new List来替代原来的类中定义好的static对象; -
修改
KeyUtil
代码,浅拷贝传入的itemList,再进行排序
3.2 类似问题的发现和改进
本问题已经修复,那类似的问题是否可以避免或者减少,将是接下来值得思考的一个问题。为了减少这类问题发生,我结合平时工作过程中的几个阶段,认为可以从以下几个方面进行改进:
- 开发
开发过程中,开发人员需要提升认知和水平,注意代码中可能存在的线程问题;注意编写单元测试,可以通过模拟多线程环境来检测潜在的问题。
- 代码评审
开发完成的代码一定需要进行代码评审,评审过程中架构师需要发挥自己丰富的开发经验和较强的代码直觉,“火眼金睛”,发现代码中的漏洞;当然这对评审人员的要求很高,因为仅通过改动的几行代码发现问题确实是一件很有挑战的事情。如果要有一些自动化工具或者插件,则可以起到事半功倍的效果。这里其实我还没有调研相关的工具,如果有大佬有相关经验欢迎评论交流。
- 测试
测试阶段除了验证正常的业务功能,还需要进行集成测试和性能测试。在集成测试中,将多个模块组合在一起,测试整个系统在多线程环境中的行为,有助于发现模块之间的交互问题。除了继承测试,有时还需要性能测试,性能测试可以发现潜在的竞争条件、死锁、资源争用等多线程问题。
四、小结
最后,我简单总结一下本文内容。本文主要记录和分析日志中的ConcurrentModificationException关键字报警,首先介绍了系统整体架构和关键代码;然后从ArrayList源码和线程安全两个方面分析问题产生原因,最后我提出了修复该问题的方案和类似问题的思考,希望对大家有帮助。