服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /*** Start the NIO endpoint, creating acceptor, poller threads.*/@Overridepublic void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}initializeConnectionLatch();// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}}

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue(); //无界队列TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());//提前启动核心线程prestartAllCoreThreads();}

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private volatile ThreadPoolExecutor parent = null;// No need to be volatile. This is written and read in a single thread// (when stopping a context and firing the  listeners)private Integer forcedRemainingCapacity = null;public TaskQueue() {super();}...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/*** {@inheritDoc}*/@Overridepublic void execute(Runnable command) {//重载 java.util.concurrent.ThreadPoolExecutor#executeexecute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}//强制入队public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the object//当前线程数达到最大,任务入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queue//如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new thread// 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queue//任务入队(当前线程数大于最大线程数)return super.offer(o);}

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueueif (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//强制入队if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else { //非 TaskQueue 直接触发拒绝策略submittedCount.decrementAndGet();throw rx;}

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) { //capacity是Integer最大值if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author dongguabai* @date 2023-11-18 22:04*/
public class Demo {public static void main(String[] args) {//无界队列TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);taskqueue.setParent(executor);observe(executor);while (true) {executor.execute(new Runnable() {public void run() {excuteForever();}});}}private static void observe(final ThreadPoolExecutor executor) {Runnable task = new Runnable() {public void run() {while (true) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());}}};new Thread(task).start();}public static void excuteForever() {while (true) {}}
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Overridepublic void startInternal() throws Exception {if (!running) {...if ( getExecutor() == null ) {createExecutor(); //如果没有自定义实现,就会使用默认实现}}...}

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/196847.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全框架springSecurity+Jwt+Vue-1(vue环境搭建、动态路由、动态标签页)

一、安装vue环境&#xff0c;并新建Vue项目 ①&#xff1a;安装node.js 官网(https://nodejs.org/zh-cn/) 2.安装完成之后检查下版本信息&#xff1a; ②&#xff1a;创建vue项目 1.接下来&#xff0c;我们安装vue的环境 # 安装淘宝npm npm install -g cnpm --registryhttps:/…

如何零基础自学AI人工智能

随着人工智能&#xff08;AI&#xff09;的快速发展&#xff0c;越来越多的有志之士被其强大的潜力所吸引&#xff0c;希望投身其中。然而&#xff0c;对于许多零基础的人来说&#xff0c;如何入门AI成了一个难题。本文将为你提供一份详尽的自学AI人工智能的攻略&#xff0c;帮…

SpringCloud微服务:Ribbon负载均衡

目录 负载均衡策略&#xff1a; 负载均衡的两种方式&#xff1a; 饥饿加载 1. Ribbon负载均衡规则 规则接口是IRule 默认实现是ZoneAvoidanceRule&#xff0c;根据zone选择服务列表&#xff0c;然后轮询 2&#xff0e;负载均衡自定义方式 代码方式:配置灵活&#xff0c;但修…

OpenCV C++ 图像 批处理 (批量调整尺寸、批量重命名)

文章目录 图像 批处理(调整尺寸、重命名)图像 批处理(调整尺寸、重命名) 拿着棋盘格,对着相机变换不同的方角度,采集十张以上(以10~20张为宜);或者棋盘格放到桌上,拿着相机从不同角度一通拍摄。 以棋盘格,第一个内焦点为坐标原点,便于计算世界坐标系下三维坐标; …

Pycharm之配置python虚拟环境

最近给身边的人写了脚本&#xff0c;在自己电脑可以正常运行。分享给我身边的人&#xff0c;却运行不起来&#xff0c;然后把报错的截图给我看了&#xff0c;所以难道不会利用pycharm搭建虚拟的环境&#xff1f;记录一下配置的过程。 第一步&#xff1a;右键要打开的python的代…

利用jquery对HTML中的名字进行替代

想法&#xff1a;将网页中经常要修改的名字放在一个以jquery编写的js文件中&#xff0c;如果需要修改名字&#xff0c;直接修改js文件中的名字即可。 新建name_07.html文件&#xff0c;写入下面的代码&#xff1a; <!DOCTYPE html> <html> <head><meta …

mp4视频批量截取!!!

mp4视频批量截取&#xff01;&#xff01;&#xff01; 问题&#xff1a;如果我们想截取一个mp4视频中的多个片段&#xff0c;一个一个截会很麻烦&#xff01; 可以将想要截取的开始时间和结束时间保存到 excel表 中&#xff0c;进行批量截取。 1、对一个视频&#xff0c;记…

给openlab搭建web网站

1.作业的要求 2.访问www.openlab.com网站 2.1先准备好相关的包和关闭防火墙等操作 mount /dev/sr0 /mnt/ //先挂载 yum install httpd -y //下载htppd systemctl stop firewalld //关闭防火墙 setenforce 02.2然后开始配置文件和仓库 这一步比较关键,之前改了接口…

【原创】java+swing+mysql鲜花购物商城设计与实现

前言&#xff1a; 本文主要介绍了鲜花购物商城的设计与实现。首先&#xff0c;通过市场需求&#xff0c;我们确定了鲜花商场的功能&#xff0c;通常的商城一般都是B/S架构&#xff0c;然而我们今天要用javaswing去开发一个C/S架构的鲜花商城&#xff0c;利用开发技术和工具&am…

C#WPF用户控件及自定义控件实例

本文演示C#WPF自定义控件实例 用户控件(UserControl)和自定义控件(CustomControl)都是对UI控件的一种封装方式,目的都是实现封装后控件的重用。 只不过各自封装的实现方式和使用的场景上存在差异。 1 基于UserControl 创建 创建控件最简单一个方法就是基于UserControl …

【观察】OpenHarmony:技术先进“创新局”,持续创新“谋新篇”

毫无疑问&#xff0c;开源作为今天整个软件产业的创新“原动力”&#xff0c;目前在软件产业发展中的重要性愈加凸显。根据Linux基金会的统计&#xff0c;现在全球软件产业中有70%以上的代码来源于开源软件。 从这个角度来看&#xff0c;开源技术已逐渐成为推动企业数字化转型和…

MATLAB中zticks函数用法

目录 语法 说明 示例 指定 z 轴刻度值和标签 指定非均匀 z 轴刻度值 以 2 为增量递增 z 轴刻度值 将 z 轴刻度值设置回默认值 指定特定坐标区的 z 轴刻度值 删除 z 轴刻度线 zticks函数的功能是设置或查询 z 轴刻度值。 语法 zticks(ticks) zt zticks zticks(auto)…

土地利用强度(LUI)综合指数

土地利用强度的概念可以解释为某一时间特定区域内人类活动对土地利用强度的干扰程度[1]&#xff0c;其不仅反映不同土地利用类型本身的自然属性&#xff0c;也体现了人类利用土地的深度和广度&#xff0c;进而揭示在人类社会系统干扰下土地资源自然综合体自然平衡的保持状态[2]…

jbase打印导出实现

上一篇实现了虚拟M层&#xff0c;这篇基于虚拟M实现打印导出。 首先对接打印层 using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; using System.Xml;namesp…

为什么Transformer模型中使用Layer Normalization(Layer Norm)而不是Batch Normalization(BN)

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

通过 Canal 将 MySQL 数据实时同步到 Easysearch

Canal 是阿里巴巴集团提供的一个开源产品&#xff0c;能够通过解析数据库的增量日志&#xff0c;提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave&#xff0c;实时接收 MySQL 的增量数据 binlog&#xff0c;然后通过 RESTful API 将数据写入到 Easysearch 中。…

Diagrams——制作短小精悍的流程图

今天为大家分享的是一款轻量级的流程图绘制软件——Diagrams。 以特定的图形符号加上说明&#xff0c;表示算法的图&#xff0c;称为流程图或框图。流程图是流经一个系统的信息流、观点流或部件流的图形代表。我们常用流程图来说明某一过程。 流程图使用一些标准符号代表某些类…

Vue 路由缓存 防止路由切换数据丢失 路由的生命周期

在切换路由的时候&#xff0c;如果写好了一丢数据在去切换路由在回到写好的数据的路由去将会丢失&#xff0c;这时可以使用路由缓存技术进行保存&#xff0c;这样两个界面来回换数据也不会丢失 在 < router-view >展示的内容都不会被销毁&#xff0c;路由来回切换数据也…

(c语言进阶)内存函数

一.memcpy(void* dest,void* src,int num) &#xff0c;操作单位为字节&#xff0c;完成复制且粘贴字符串 1.应用 #include <stdio.h> #include<string.h> int main() {int arr1[] { 1,2,3,4,5,6,7,8,9,10 };int arr2[20] { 0 };memcpy(arr2, arr1, 20);//从…

中移链共识机制介绍

01 为什么需要共识 共识是对某事达成的共同看法&#xff0c;它是区块链的灵魂&#xff0c;对确保区块链的完整性和安全性起着至关重要的作用。在传统的集中式系统中&#xff0c;单个实体或一组实体有权验证和记录交易。然而&#xff0c;区块链中的一个核心概念是去中心化&…