文章目录
- 问题现象
- 问题定位
- 问题代码
- 根因分析
- 现象剖析
- newCachedThreadPool 源码
- SynchronousQueue
- 特点
- 构造方法
- 主要方法
- 应用场景
- Code Demo
- 运行结果
- 问题修复
问题现象
时不时有报警提示线程数过多,超过2000 个,收到报警后查看监控发现,瞬时线程数比较多但过一会儿又会降下来,线程数抖动很厉害,而应用的访问量变化不大。
问题定位
为了定位问题,在线程数比较高的时候进行线程栈抓取,抓取后发现内存中有 1000 多个自定义线程池。
一般而言,线程池肯定是复用的, 1000 多个线程池肯定不正常啊。。。。。。。。
问题代码
在项目代码里,我们没有搜到声明线程池的地方,搜索 execute 关键字后定位到,原来是业务代码调用了一个类库来获得线程池,类似如下的业务代码:调用 ThreadPoolHelper
的 getThreadPool
方法来获得线程池,然后提交数个任务到线程池处理,并没有看出什么异常
public String smiulate() throws InterruptedException {ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();IntStream.rangeClosed(1, 10).forEach(i -> {threadPool.execute(() -> {String payload = IntStream.rangeClosed(1, 1000000).mapToObj(__ -> "a").collect(Collectors.joining("")) + UUID.randomUUID().toString();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}log.debug(payload);});});return "OK";}
继续看 ThreadPoolHelper
的getThreadPool
方法居然是每次都使用 Executors.newCachedThreadPool
来创建一个线程池
public static ThreadPoolExecutor getThreadPool() {// 线程池没有复用return (ThreadPoolExecutor) Executors.newCachedThreadPool();}
根因分析
现象剖析
可以想到 newCachedThreadPool
会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程。如果业务操作并发量较大的话,的确有可能一下子开启几千个线程。
为什么我们能在监控中看到线程数量会下降,而不会撑爆内存呢?回到 newCachedThreadPool
的定义就会发现,它的核心线程数是 0,而 keepAliveTime
是 60 秒,也就是在 60 秒之后所有的线程都是可以回收的。好吧,就因这个特性,业务程序并灭有死得没太难看。。。。。。
newCachedThreadPool 源码
看下 newCachedThreadPool
的源码
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
该线程池特性:
- 线程池会根据需要创建新线程,但会重用已有的空闲线程。
- 如果没有可用的空闲线程,则会创建新的线程并添加到池中。
- 空闲超过60秒的线程会被终止并从池中移除。
参数说明:
- 0:核心线程数为0,即初始时不会创建任何线程。
- Integer.MAX_VALUE:最大线程数为整型的最大值,表示可以创建大量线程。
- 60L:线程空闲时间,单位为秒。
- TimeUnit.SECONDS:时间单位为秒。
new SynchronousQueue<Runnable>()
:任务队列,使用同步队列,确保每个任务都会立即被线程处理。
SynchronousQueue
SynchronousQueue 是 Java 并发包 java.util.concurrent 中的一个特殊的阻塞队列。它的设计目的是为了实现线程之间的直接传递,而不是存储元素。
特点
-
无缓冲:
SynchronousQueue 不存储元素,它只是一个直接传递的机制。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。 -
一对一传递:
每个插入操作(put)必须等待一个对应的移除操作(take),反之亦然。这意味着生产者线程必须等待消费者线程准备好接收元素,消费者线程也必须等待生产者线程提供元素。
-
高效:
由于 SynchronousQueue 不存储元素,因此在某些场景下可以提供更高的性能,特别是在需要快速传递数据的情况下。 -
公平性选项:
SynchronousQueue 提供了公平性和非公平性两种模式。公平模式下,线程按照 FIFO 顺序进行匹配;非公平模式下,线程可能会抢占匹配机会。 -
线程安全:
SynchronousQueue 是线程安全的,内部使用锁和条件变量来确保线程间的同步。
构造方法
SynchronousQueue
提供了两个构造方法:
SynchronousQueue()
:默认构造方法,使用非公平模式。SynchronousQueue(boolean fair)
:指定是否使用公平模式
主要方法
void put(E e)
:将指定的元素插入队列,如果当前没有消费者线程在等待,则阻塞直到有消费者线程。E take()
:获取并移除队列中的一个元素,如果当前没有生产者线程在等待,则阻塞直到有生产者线程。- b
oolean offer(E e)
:尝试将指定的元素插入队列,如果当前有消费者线程在等待,则立即返回 true,否则返回 false。 boolean offer(E e, long timeout, TimeUnit unit)
:尝试将指定的元素插入队列,如果当前有消费者线程在等待,则立即返回 true,否则等待指定的时间,如果超时则返回 false。E poll()
:尝试从队列中获取并移除一个元素,如果当前有生产者线程在等待,则立即返回该元素,否则返回 null。E poll(long timeout, TimeUnit unit)
:尝试从队列中获取并移除一个元素,如果当前有生产者线程在等待,则立即返回该元素,否则等待指定的时间,如果超时则返回 null。
应用场景
适用于需要快速传递数据且不需要缓冲的场景
-
工作窃取(Work Stealing):
在多线程环境中,工作窃取算法通过让空闲线程从其他线程的任务队列中“窃取”任务来提高负载均衡。SynchronousQueue 可以用于实现这种任务传递。 -
事件驱动系统:
在事件驱动系统中,事件处理器之间需要快速传递事件,SynchronousQueue 可以用于实现这种快速传递。 -
管道通信:
在管道通信中,生产者和消费者之间需要直接传递数据,SynchronousQueue 可以用于实现这种直接传递。
Code Demo
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();// 生产者线程Thread producer = new Thread(() -> {try {System.out.println("生产者线程准备插入数据");queue.put("Hello");System.out.println("生产者线程插入数据完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {System.out.println("消费者线程准备获取数据");String data = queue.take();System.out.println("消费者线程获取到数据: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();}
}
运行结果
生产者线程准备插入数据
消费者线程准备获取数据
生产者线程插入数据完成
消费者线程获取到数据: Hello
问题修复
使用一个静态字段来存放线程池的引用,返回线程池的代码直接返回这个静态字段即可。
最佳实践,手动创建线程池
import com.google.common.util.concurrent.ThreadFactoryBuilder;static class ThreadPoolHelper {private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build());static ThreadPoolExecutor getThreadPool() {return threadPoolExecutor;}}