1.问题说明
问题背景说明,在使用达梦数据库时,mybatis-plus的serviceImpl.saveBatch()方法或者updateBatchById()方法的时候,随着数据量、属性字段的增加,效率越发明显的慢。
serviceImpl.saveBatch();
serviceImpl.updateBatchById();
2.mysql的解决思路
如果你使用的是mysql的话,可以参考如下这个老哥的文章https://www.cnblogs.com/ajianbeyourself/p/18344695。改起来也简单,也就是配置参数加个属性。
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?rewriteBatchedStatements=true
总结下就是:在 MyBatis-Plus 中启用 rewriteBatchedStatements 主要是为了提高批量插入/更新操作的性能。rewriteBatchedStatements 是 MySQL JDBC 驱动程序中的一个参数,用于将批量操作转换为单个 SQL 语句,以提高执行效率。
mysql的rewriteBatchedStatements属性,可以将多个SQL语句转化为一个sql语句。
这里我就不在啰嗦mysql的优化了,主要是针对其他数据库驱动没有rewriteBatchedStatements支持的情况下,我们该怎么优化,并且代价最小。
3.性能演示
以下代码是一个示例,先调用remove清空所有数据,然后记录开始时间,等待saveBatch以后,然后记录消耗时间
String oldData = JsonUtils.readFile("D:\\xxxx\\restdata\\" +"NR_RES_CHANNELROUTENODE_M" + ".json");
List<NrResChannelroutenodeM> list = JsonUtils.strToListBean(oldData, NrResChannelroutenodeM.class);
channelroutenodeMService.remove(null);
long start = System.currentTimeMillis();
channelroutenodeMService.saveBatch(list);
long end = System.currentTimeMillis() - start;
log.info("保存:{},数据总量:{},消耗时间:{}秒","List<NrResChannelroutenodeM>", list.size() , end / 1000f);
不然发现,使用mybatis-plus的原始saveBatch,基于NrResChannelroutenodeM这个实体来说,数据量约46万,消耗时间,大概110秒。这是在我本地的测试情况,实际上在用户现场的开发测试机上,这里的保存的时间已经超过了15分钟(原因有很多, 客户现场电脑配置较低,客户现场的部署环境有大概120个这样的批量保存,我这里只单独测试这一个所以只用110秒)。
接下来我们注释掉saveBatch,改成我自己编写的批量保存。这接近46万的数据量,我们切割成459份,一份保存1000条,1个线程保存需要17秒。
提升效果:110秒 -> 17秒
接下来,我改成500条数据一份,切割成918分,可以发现,性能还能更快,大概提升了2.5秒钟。也就是说,针对这个实体的数据来说,每次更新500条,比更新1000条快那么一小丢。
提升效果:17.1秒 -> 14.6秒
接下来,我将线程数提升到5:即5个线程运行,可以发现。时间来到了4.6秒,从最开始的110秒,到现在的4.6秒,这个提升很夸张了
提升效果:14.6秒 -> 4.6秒
4.优化思路
这里提一下,之前说的mysql是如何优化的。
mysql的rewriteBatchedStatements属性,可以将多个SQL语句转化为一个sql语句。
所以我思路也一样,如果是其他的非mysql,那就是把多个sql拼接成一个sql。
简单说,mybatis-plus执行批量保存到了数据库的时候,是下面这样的,
INSERT INTO users (name, email, age) VALUES ('John Doe', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe1', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe2', 'johndoe@example.com', 18);
INSERT INTO users (name, email, age) VALUES ('John Doe3', 'johndoe@example.com', 18);
而我们要的批量保存到了数据库执行的时候应该是下面这样的,
INSERT INTO users (name, email, age) VALUES ('John Doe', 'johndoe@example.com', 18),VALUES ('John Doe1', 'johndoe@example.com', 18),VALUES ('John Doe2', 'johndoe@example.com', 18),VALUES ('John Doe3', 'johndoe@example.com', 18);
伪代码示意
StringBuilder insert = new StringBuilder();
insert.append(INSERT INTO).append(表名);
insert.append(表字段);
for insert.append(字段对应的值);
4.1.准备一个数据库实体类
任意实体类即可,例如如下这种实体类。说明下实体类的要求,我们需要从这个实体类中提取出哪些信息来:
表名:获取@TableName(“SG_PULL_CONFIG”)或者获取类名
字段名:获取@TableId(“TABLE_NAME”)或者获取属性名
@Data
@TableName("SG_PULL_CONFIG")
public class SgPullConfig implements Serializable {private static final long serialVersionUID = 1L;@TableId("TABLE_NAME")private String tableName;@TableField("DATA_URL")private String dataUrl;@TableField(value = "CREATE_TIME", fill = FieldFill.INSERT)private Date createTime;
}
4.2.获取实体类对应的表名
/*** @description:获取数据库实体类的的表名:TableName或者类名转驼峰* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年12月5日 上午11:10:01*/public static String getTableName(Object object) {String name ="";TableName annotation = object.getClass().getAnnotation(TableName.class);if(annotation != null) {name = annotation.value();}else {name = object.getClass().getSimpleName();name = QueryService.humpToLine(name);name = name.toUpperCase();}return name;}
4.3.获取数据表的属性字段
/*** @description:获取数据库实体类的字段名* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年12月5日 上午11:09:43*/public static List<String> getFields(Object object){Field[] fields = object.getClass().getDeclaredFields();List<String> list = new ArrayList<>(fields.length);for (Field field : fields) {field.setAccessible(true);try {//TableId修饰的主键排除自增TableId tableId = field.getAnnotation(TableId.class);if (tableId != null && !IdType.AUTO.equals(tableId.type())) {list.add(tableId.value());continue;}//TableField修饰的属性字段排除不存在的字段TableField tableField = field.getAnnotation(TableField.class);if (tableField != null && tableField.exist()) {list.add(tableField.value());continue;}//使用属性名和数据库字段名进行匹配的if(tableId == null && tableField == null && !"serialVersionUID".equals(field.getName())) {list.add(QueryService.humpToLine(field.getName()));}} catch (Exception e) {log.info("获取实体类的TableId和TableField异常");}}return list;}
4.4.获取数据表的属性值
这里需要注意一下:获取的属性值,需要对字符串和时间做特殊处理。如下图所示,看VALUES里面的部分,如果是字符串,我们需要添加单引号。
/*** @description:获取数据库实体类的属性值* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年12月5日 上午11:09:17*/public static List<Object> getdValues(Object object){Field[] fields = object.getClass().getDeclaredFields();List<Object> list = new ArrayList<>(fields.length);for (Field field : fields) {field.setAccessible(true);try {//TableId修饰的主键排除自增TableId tableId = field.getAnnotation(TableId.class);if ((tableId != null && !IdType.AUTO.equals(tableId.type()))) {list.add(getSqlValueByType(field.get(object), field));continue;}//TableField修饰的属性字段排除不存在的字段TableField tableField = field.getAnnotation(TableField.class);if (tableField != null && tableField.exist()) {list.add(getSqlValueByType(field.get(object), field));continue;}//使用属性名和数据库字段名进行匹配的if(tableId == null && tableField == null && !"serialVersionUID".equals(field.getName())) {list.add(getSqlValueByType(field.get(object), field));}} catch (Exception e) {log.info("获取实体类的TableId和TableField异常");}}return list;}private static Object getSqlValueByType(Object value, Field field) {if(value == null) {return null;}if(field.getType() == String.class) {return "'" + value + "'";}if(field.getType() == Date.class) {return "'" + DateUtils.getStrDate((Date)value, null) + "'";}return value;}
4.5.用:表名_字段名_字段值---->拼接SQL
public static String getBatchInsertSql(List<?> list) {StringBuilder insert = new StringBuilder();String tableName = getTableName(list.get(0));List<String> fields = getFields(list.get(0));insert.append("INSERT INTO ").append(tableName).append("(");fields.forEach(temp -> insert.append(temp).append(","));insert.deleteCharAt(insert.length() - 1);insert.append(") VALUES ");StringBuilder valueTemp = null;for (Object temp : list) {valueTemp = new StringBuilder(); insert.append("(");List<Object> values = getdValues(temp);for (Object value : values) {valueTemp.append(value).append(",");}valueTemp.deleteCharAt(valueTemp.length() - 1);insert.append(valueTemp.toString());insert.append("),");}insert.deleteCharAt(insert.length() - 1);return insert.toString();}
4.6.执行拼接的sql语句
//注入SqlRunner
@SpringBootConfiguration
@MapperScan(basePackages = "com.map.**.mapper")
@EnableTransactionManagement
public class MybatisConfig {@Beanpublic SqlRunner sqlRunner() {return new SqlRunner();}
}//SqlRunner 执行sql语句
String sql = getBatchInsertSql(list);
sqlRunner.insert(sql);
5.1多线程优化
目前为止我的代码如下,其中DB database无视就好了,这是我多数据源的时候切换数据源用的
- 先把数据切割成N份
- 创建线程池(最长线程数是)
- 现场池提交任务
- 主线程等待线程池的任务执行完毕
/*** @description:批量保存* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年12月9日 下午2:36:10*/public <T> void saveBatch(IService<T> service, List<T> list, DB database) {if(ObjectUtils.isEmpty(list)) {log.error("list数据为空!");}if(list.size() <= DEFAULT_BATCH_SIZE) {service.saveBatch(list);}else {//限制批量保存最大的线程为5int batchThread = Math.min(list.size() / DEFAULT_BATCH_SIZE, 5);bigDataSave(list, database , DEFAULT_BATCH_SIZE, batchThread);}}/*** @description:大数据量的数据保存* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年12月4日 下午4:43:37*/public void bigDataSave(List<?> list, DB database, int size ,int thread) {if(ObjectUtils.isEmpty(list)) {return;}List<List<?>> splitList = ArraysUtils.splitList(list, size);log.info("当前数据量:{},切割:{}份", list.size(), splitList.size());ExecutorService executor = threadPoolService.newFixedThreadPool(thread);for (int i = 0; i < splitList.size(); i++) {executor.submit(new BigDataTask(splitList.get(i), database, sqlRunner));}threadPoolService.shutdownAndWait(executor);}
线程池配置
@Component
public class ThreadPoolService {//最大线程数private int maximumPoolSize = 20;public ThreadPoolService threadPoolService() {return new ThreadPoolService();}public ExecutorService newFixedThreadPool(int nThreads) {//防止线程数太大,印制最大为20return new ThreadPoolExecutor(nThreads, Math.min(nThreads, maximumPoolSize),0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, maximumPoolSize,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public void shutdownAndWait(ExecutorService executor) {executor.shutdown();while (!executor.isTerminated()){};}}
多线程的子任务
@Log4j2
@AllArgsConstructor
public class BigDataTask implements Runnable {//以下属性使用构造方法注入进来的,因为自己new BigDataTask,BigDataTask不是spring托管,因此无法使用Spring注入进来private List<?> list;private DB database;private SqlRunner sqlRunner;@Overridepublic void run() {//这个代码是用来切换数据源的DataSourceContextHolder.setDataSource(database.getEnumId());try {String sql = SaveBatchSerive.getBatchInsertSql(list);sqlRunner.insert(sql);} catch (Exception e) {log.info("任务:BigDataTask,处理失败,失败原因:{}", e);}DataSourceContextHolder.removeDataSource();}
}
在批量保存的地方调用即可,如下所示
优点:
- 入门难度低,不需要对mybatis-plus做任何修改,减少对mybatis-plus技术的研究工作量
- 操作可控,仅针对性能有问题的地方将xxxxService.saveBatch(list)改为我们自己编写的saveBatchSerive.saveBatch()即可,是局部性,而不是全局的,不至于出现为了修改某个地方的saveBatch()导致所有的saveBatch()都出现问题
- 可以合理自己配置适合自己的线程数以提升效率(并不是线程数越多越好)详情可以看我以前的介绍:系统适合开启多少线程数量?
saveBatchSerive.saveBatch(channelroutenodeMService, list, DB.从库);
//saveBatchSerive.bigDataSave(list, DB.从库 , 500, 1);
//channelroutenodeMService.saveBatch(list);
缺点:
1.没有在底层修改,如果开发团队其他开发成员调用原生的mybatis-plus,saveBatch时,还会出现性能问题
2.无法对已经编写的代码进行优化,需要将历史代码中的saveBatch替换成自己的。
5其他优化方式-替换saveBatch
具体实现方式参考:我这里就不废话了,但是我并不推荐这种方式,
https://openatomworkshop.csdn.net/6645aa50b12a9d168eb6bd90.html
大概思路如下:
- 编写一个RootMapper/RootService来替换原来的BaseMapper/IService
- 自己编写批量保存代码
- 业务Mapper/业务Service继承(实现)时,用RootMapper、RootService
- 批量保存的时候,用的是RootMapper的批量保存,不是BaseMapper的批量保存