一.业务场景
最近业务需要,做了性能优化操作。百万级消息在kafka中秒级传输。cpu密集计算分钟级完成,然后在mysql中秒级落库.模型cpu计算提高了1倍,落表速度提高了5倍,2分钟内完成.
如下序列图:
业务系统A发送千级别消息到中转系统B.B收到消息后拆解为百万级的消息发送到kafka.模型C收到kafka消息后需要大量的cpu计算,计算完后返回百万级消息给系统B,B合并结果消息后,写入批次的数据到缓存redis.B返回给A批次成功通知,A读取redis的百万级数据后插入数据库.
二.业务问题
处理百万级消息,碰到如下问题.
1.通过kafa发送接收,如何加快速度.
2.通过多个系统传递,如何存储.
3.计算完,百万条记录存储数据库,如何快速写表.
三.优化方案
(一).kafka消息优化
1.扩大topic的partition数量,两位数级别.
2.springboot的注解@KafkaListener,修改concurrency,根据机器数计算,比如2台机器,单台设置为partition数量/服务器数.
3.接收到消息后,建立动态线程池,异步处理消息.线程池队列大小在千级别,核心和最大线程数在百级别,策略为CallerRunsPolicy.具体大小配置在压力测试中根据cpu和内存大小使用情况获得。支持配置中心的动态推送,服务器压力处于高峰时,可以推送修改线程池的corePoolSize,maximumPoolSize,减小压力.
4.cpu密集计算型.模型端接收kafka消息处理时,使用线程池处理,核心线程数改为cpu核数的至少2倍.
(二).服务间数据传输
1.服务间传输消息时,只传输批次号。约定redis缓存key,数据存入redis,全部从缓存获取.
(三).百万条记录写库优化
1.mysql优化
1.可以使用mysql提供的load指令加载大量数据,查看@@local_infile开关是否生效.我们没有使用这个.
2.关闭唯一性校验,查看 @@UNIQUE_CHECKS配置,涉及数据正确性。没有使用这个.
3.修改max_allowed_packet为500mb,它代表MySQL服务端或者客户端接收一次传送数据包的最大大小。
4.修改innodb_log_buffer_size为256mb,这个代表innodb 数据库引擎写日志缓存区。
5.修改innodb_log_file_size,innodb 数据库引擎UNDO日志的大小,减少数据库checkpoint操作。
2.java代码优化
1.业务系统接收到批次处理消息后,做两层分发。第一层提交到线程池处理,比如百万级数据数据,分到10个线程。每个线程处理十万数据.第二层单线程内分批提交,比如一次提交5000条数据,10万条数据,每个线程只需要提交20次.这个具体大小设置需要根据压测结果得到.这里最复杂的是解决多线程的事务一致性问题.
2.第一层提交到多线程时,需要自己统计提交到每个线程的数据量,主线程可以使用CountDownLatch等待子线程处理完成。主线程根据总数据量,线程数计算提交到每个子线程的数据量.
3.第二层子线程处理分为两种情况,不处理事务一致,insert语句执行时,事务直接提交了,这个比较简单.如果处理多线程事务一致,事务在主线程中一起提交,需要在子线程中新建事务,拷贝主线程事务到子线程,返回主线程zi子事务对象,在主线程中遍历事务,一起commit或者rollback.模拟两阶段提交,类似分布式事务,只是在主线程中统一控制事务状态.
java端的优化代码,@我索取.