在文章开头,先和大家抛出两个问题:
- 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
- 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?
JUC 线程池
在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
即执行流程为:
- 收到提交任务
- 当前线程数小于核心线程数,创建一个新的线程来执行任务
- 当前线程数大于等于核心线程数,
- 如果阻塞队列未满,将任务存储到队列
- 如果阻塞队列已满
- 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
- 如果当前线程数大于等于最大线程数,执行拒绝策略
可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。
Tomcat 线程池分析
Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal
:
/*** Start the NIO endpoint, creating acceptor, poller threads.*/@Overridepublic void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}initializeConnectionLatch();// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}}
再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor
:
public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue(); //无界队列TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}
要注意这里的 ThreadPoolExecutor
不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor
,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor
,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor
:
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {...
}
查看它的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());//提前启动核心线程prestartAllCoreThreads();}
可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor
则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue
:
public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private volatile ThreadPoolExecutor parent = null;// No need to be volatile. This is written and read in a single thread// (when stopping a context and firing the listeners)private Integer forcedRemainingCapacity = null;public TaskQueue() {super();}...
}
而在创建 org.apache.tomcat.util.threads.TaskQueue
的时候,并没有传递 capacity
,也就是说 Tomcat 的线程池使用的是无界队列。
接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)
:
/*** {@inheritDoc}*/@Overridepublic void execute(Runnable command) {//重载 java.util.concurrent.ThreadPoolExecutor#executeexecute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}
本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute
方法:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}//强制入队public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}
这里的 workQueue
是 org.apache.tomcat.util.threads.TaskQueue
, org.apache.tomcat.util.threads.TaskQueue#offer
:
@Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the object//当前线程数达到最大,任务入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queue//如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new thread// 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queue//任务入队(当前线程数大于最大线程数)return super.offer(o);}
再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute
方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker
返回 false
才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)
在触发了拒绝策略后还有一个特殊处理:
//如果是 TaskQueueif (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//强制入队if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else { //非 TaskQueue 直接触发拒绝策略submittedCount.decrementAndGet();throw rx;}
再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)
:
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}
说白了就是直接入队(无界队列):
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) { //capacity是Integer最大值if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}
这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:
package blog.dongguabai.others.tomcat_threadpool;import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author dongguabai* @date 2023-11-18 22:04*/
public class Demo {public static void main(String[] args) {//无界队列TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);taskqueue.setParent(executor);observe(executor);while (true) {executor.execute(new Runnable() {public void run() {excuteForever();}});}}private static void observe(final ThreadPoolExecutor executor) {Runnable task = new Runnable() {public void run() {while (true) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());}}};new Thread(task).start();}public static void excuteForever() {while (true) {}}
}
输出:
2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...
可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。
当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor
即可:
public Executor getExecutor() { return executor; }
org.apache.tomcat.util.net.NioEndpoint#startInternal
会进行判断:
@Overridepublic void startInternal() throws Exception {if (!running) {...if ( getExecutor() == null ) {createExecutor(); //如果没有自定义实现,就会使用默认实现}}...}
Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer
最大值,所以几乎不会触发拒绝策略。
最后
最后再回过头看文章开头的两个问题:
-
每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。
org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnections
从Connector
的角度设置,这块不在本文探讨范围之内。虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:
- 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
- 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
-
大家有遇到过 Tomcat 线程池触发了拒绝策略吗?
Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于
org.apache.tomcat.util.net.AbstractEndpoint#getExecutor
自定义线程池进行控制。
References
- 《Java 并发编程的艺术》
欢迎关注公众号: