科普文:微服务之SpringBoot性能优化器动态线程池【Dynamic-Tp】特性和源码解读

一、简述

gitee地址:https://gitee.com/yanhom/dynamic-tp

github地址:https://github.com/lyh200/dynamic-tp

dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,目前支持的配置中心有ApolloNacosZookeeper,同时dynamic-tp支持线程池的监控和报警,具体特性如下:

  • 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可使用
  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,已支持NacosApolloZookeeper,同时也提供SPI接口可自定义扩展实现
  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供SPI接口可自定义扩展实现
  • 内置线程池指标采集功能,支持通过MicroMeterJsonLog日志输出、Endpoint三种方式,可通过SPI接口自定义扩展实现
  • 集成管理常用第三方组件的线程池,已集成SpringBoot内置WebServer(tomcatundertowjetty)的线程池管理
  • Builder提供TTL包装功能,生成的线程池支持上下文信息传递

代码结构

代码结构

代码结构

1.adapter模块:主要是适配一些第三方组件的线程池管理,目前已经实现的有SpringBoot内置的三大web容器(Tomcat、Jetty、Undertow)的线程池管理,后续可能接入其他常用组件的线程池管理。

2.common模块:主要是一些各个模板都会用到的类,解耦依赖,复用代码,大家日常开发中可能也经常会这样做。

3.core模块:该框架的核心代码都在这个模块里,包括动态调整参数,监控报警,以及串联整个项目流程都在此。

4.example模块:提供一个简单使用示例,方便使用者参照

5.logging模块:用于配置框架内部日志的输出,目前主要用于输出线程池监控指标数据到指定文件

6.starter模块:集成各个配置中心实现动态更新配置,目前已经集成Nacos、Apollo两个主流配置中心,使用者也可以参照扩展其他配置中心实现,客户端使用也只需引入相应starter依赖就行。

二、源码分析

在分析源码之前,我们先来思考下如果是我们来实现动态线程池组件应该如何设计。

  1. 首先不论是硬编码的线程池还是通过配置化动态生成的线程池都是一类线程池(同一基类),而这一类线程池的参数可以抽象成配置,这个配置既可以是本项目中的文件;也可以是任意远程端口的文件,例如包括业界的配置中心们例如nacoszookeeperapolloetcd等;当然它甚至可以不依赖配置中心,通过前端管理系统页面配置,走DB,通过刷新API接口中的String类型的文本配置,进而刷新线程池配置,这也是一种实现思路。
  2. 提供一个功能入口可以将配置构造成一个线程池对象,内部维护一个线程池注册表,将配置对应的线程池添加至注册表中。
  3. 实例化线程池对象,Spring环境则注入依赖Bean,以供IOC容器使用。
  4. 项目启动时首先先加载配置实例化线程池对象
  5. 如果配置指向的是远端配置中心,则注册监听器,当远端注册配置中心刷新时回调,当前系统监听到回调刷新配置,刷新线程池(动态调参),刷新本地线程池注册表。

至此我们设计出来的简易动态线程池组件应该可以基本使用了。

其实简易动态线程池组件还有很多进步的空间,例如线程池调参监控,异常报警等。

当然以上说的这些基础功能以及额外的高级功能,dynamic-tp都已经实现了,不过它目前没有提供支持我们刚刚所说通过管理系统页面配置走DB通过接口刷新的官方实现,且不支持除配置中心应用外的选择,也就是说无配置中心应用,目前不支持线程池动态调参(但支持监控),但事实上你可以根据它提供的SPI自行实现。
这可能dynamic-tp定位是轻量级动态线程池组件,且配置中心是现在大多数互联网系统都会使用的组件有关。

2.1 配置

dynamic-tp通过DtpProperties来做配置的统一收口,这个配置包括本地文件或者配置中心中的文件(propertiesjsonymltxtxml),可以看到目前已支持NacosApolloZookeeperConsulEtcd配置中心

@Slf4j
@Data
@ConfigurationProperties(prefix = DynamicTpConst.MAIN_PROPERTIES_PREFIX)
public class DtpProperties {/*** If enabled DynamicTp.*/private boolean enabled = true;/*** If print banner.*/private boolean enabledBanner = true;/*** Nacos config.*/private Nacos nacos;/*** Apollo config.*/private Apollo apollo;/*** Zookeeper config.*/private Zookeeper zookeeper;/*** Etcd config.*/private Etcd etcd;/*** Config file type.*/private String configType = "yml";/*** If enabled metrics collect.*/private boolean enabledCollect = false;/*** Metrics collector types, default is logging.*/private List<String> collectorTypes = Lists.newArrayList(MICROMETER.name());/*** Metrics log storage path, just for "logging" type.*/private String logPath;/*** Monitor interval, time unit(s)*/private int monitorInterval = 5;/*** ThreadPoolExecutor configs.*/private List<ThreadPoolProperties> executors;/*** Tomcat worker thread pool.*/private SimpleTpProperties tomcatTp;/*** Jetty thread pool.*/private SimpleTpProperties jettyTp;/*** Undertow thread pool.*/private SimpleTpProperties undertowTp;/*** Dubbo thread pools.*/private List<SimpleTpProperties> dubboTp;/*** Hystrix thread pools.*/private List<SimpleTpProperties> hystrixTp;/*** RocketMq thread pools.*/private List<SimpleTpProperties> rocketMqTp;/*** Grpc thread pools.*/private List<SimpleTpProperties> grpcTp;/*** Motan server thread pools.*/private List<SimpleTpProperties> motanTp;/*** Okhttp3 thread pools.*/private List<SimpleTpProperties> okhttp3Tp;/*** Brpc thread pools.*/private List<SimpleTpProperties> brpcTp;/*** Tars thread pools.*/private List<SimpleTpProperties> tarsTp;/*** Sofa thread pools.*/private List<SimpleTpProperties> sofaTp;/*** Notify platform configs.*/private List<NotifyPlatform> platforms;@Datapublic static class Nacos {private String dataId;private String group;private String namespace;}@Datapublic static class Apollo {private String namespace;}@Datapublic static class Zookeeper {private String zkConnectStr;private String configVersion;private String rootNode;private String node;private String configKey;}/*** Etcd config.*/@Datapublic static class Etcd {private String endpoints;private String user;private String password;private String charset = "UTF-8";private Boolean authEnable = false;private String authority = "ssl";private String key;}
}

项目中提供了一个配置解析接口ConfigParser

public interface ConfigParser {// 是否支持配置解析boolean supports(ConfigFileTypeEnum type);// 解析支持的类型List<ConfigFileTypeEnum> types();// 解析Map<Object, Object> doParse(String content) throws IOException;// 解析指定前缀Map<Object, Object> doParse(String content, String prefix) throws IOException;
}

ConfigFileTypeEnum如下,覆盖了主流文件类型

@Getter
public enum ConfigFileTypeEnum {PROPERTIES("properties"),XML("xml"),JSON("json"),YML("yml"),YAML("yaml"),TXT("txt");
}

项目中实现了配置解析基类,以及默认提供了3中文件类型配置解析类,jsonproperties以及yaml,使用者完全可以通过继承AbstractConfigParser来补充配置解析模式。

AbstractConfigParser代码如下,模板方法由子类实现具体的解析逻辑。

public abstract class AbstractConfigParser implements ConfigParser {@Overridepublic boolean supports(ConfigFileTypeEnum type) {return this.types().contains(type);}@Overridepublic Map<Object, Object> doParse(String content, String prefix) throws IOException {return doParse(content);}
}

子类的实现这里就不看了,大差不差就是通过读取文件,解析每一行配置项,最后将结果封装成Map<Object, Object> result返回。

接着通过Spring-bind提供的解析方法将Map<Object, Object> result绑定到DtpProperties配置类上

实现代码如下:

public class PropertiesBinder {private PropertiesBinder() { }public static void bindDtpProperties(Map<?, Object> properties, DtpProperties dtpProperties) {ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);Binder binder = new Binder(sources);ResolvableType type = ResolvableType.forClass(DtpProperties.class);Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);binder.bind(MAIN_PROPERTIES_PREFIX, target);}public static void bindDtpProperties(Environment environment, DtpProperties dtpProperties) {Binder binder = Binder.get(environment);ResolvableType type = ResolvableType.forClass(DtpProperties.class);Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);binder.bind(MAIN_PROPERTIES_PREFIX, target);}
}

到这里已经拿到了配置,我们来看接下来的流程。

2.2 注册线程池

DtpBeanDefinitionRegistrar实现了ConfigurationClassPostProcessor利用Spring的动态注册bean机制,在bean初始化之前注册BeanDefinition以达到注入bean的目的

ps:最终被Spring ConfigurationClassPostProcessor执行出来对这块不熟悉的小伙伴可以去翻看Spring源码。

来看下DtpBeanDefinitionRegistrar具体做了什么吧

@Slf4j
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {private Environment environment;@Overridepublic void setEnvironment(Environment environment) {this.environment = environment;}@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry) {DtpProperties dtpProperties = new DtpProperties();// 从 Environment 读取配置信息绑定到 DtpPropertiesPropertiesBinder.bindDtpProperties(environment, dtpProperties);// 获取配置文件中配置的线程池val executors = dtpProperties.getExecutors();if (CollectionUtils.isEmpty(executors)) {log.warn("DynamicTp registrar, no executors are configured.");return;}// 遍历并注册线程池 BeanDefinitionexecutors.forEach(x -> {// 类型选择,common->DtpExecutor,eager->EagerDtpExecutor Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());// 通过 ThreadPoolProperties 来构造线程池所需要的属性Map<String, Object> properties = buildPropertyValues(x);Object[] args = buildConstructorArgs(executorTypeClass, x);// 工具类 BeanDefinition 注册 Bean 相当于手动用 @Bean 声明线程池对象BeanUtil.registerIfAbsent(registry, x.getThreadPoolName(), executorTypeClass,properties, args);});}
}

registerBeanDefinitions方法中主要做了这么几件事

  1. Environment读取配置信息绑定到DtpProperties
  2. 获取配置文件中配置的线程池,如果没有则结束
  3. 遍历线程池,绑定配置构造线程池所需要的属性,根据配置中的executorType注册不同类型的线程池Bean(下面会说)
  4. BeanUtil#registerIfAbsent()注册Bean

ExecutorType目前项目支持3种类型,分别对应3个线程池,这里先跳过,我们下文详细介绍

回到刚才的步骤,接下来通过ThreadPoolProperties来构造线程池所需要的属性

private Map<String, Object> buildPropertyValues(ThreadPoolProperties tpp) {Map<String, Object> properties = Maps.newHashMap();properties.put(THREAD_POOL_NAME, tpp.getThreadPoolName());properties.put(THREAD_POOL_ALIAS_NAME, tpp.getThreadPoolAliasName());properties.put(ALLOW_CORE_THREAD_TIMEOUT, tpp.isAllowCoreThreadTimeOut());properties.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, tpp.isWaitForTasksToCompleteOnShutdown());properties.put(AWAIT_TERMINATION_SECONDS, tpp.getAwaitTerminationSeconds());properties.put(PRE_START_ALL_CORE_THREADS, tpp.isPreStartAllCoreThreads());properties.put(RUN_TIMEOUT, tpp.getRunTimeout());properties.put(QUEUE_TIMEOUT, tpp.getQueueTimeout());val notifyItems = mergeAllNotifyItems(tpp.getNotifyItems());properties.put(NOTIFY_ITEMS, notifyItems);properties.put(NOTIFY_ENABLED, tpp.isNotifyEnabled());val taskWrappers = TaskWrappers.getInstance().getByNames(tpp.getTaskWrapperNames());properties.put(TASK_WRAPPERS, taskWrappers);return properties;
}

选择阻塞队列,这里针对EagerDtpExecutor做了单独处理,选择了TaskQueue作为阻塞队列(下文说明)

private Object[] buildConstructorArgs(Class<?> clazz, ThreadPoolProperties tpp) {BlockingQueue<Runnable> taskQueue;// 如果是 EagerDtpExecutor 的话,对工作队列就是 TaskQueueif (clazz.equals(EagerDtpExecutor.class)) {taskQueue = new TaskQueue(tpp.getQueueCapacity());} else {// 不是 EagerDtpExecutor的话,就根据配置中的 queueType 来选择阻塞的队列taskQueue = buildLbq(tpp.getQueueType(), tpp.getQueueCapacity(), tpp.isFair(),tpp.getMaxFreeMemory());}return new Object[]{tpp.getCorePoolSize(),tpp.getMaximumPoolSize(),tpp.getKeepAliveTime(),tpp.getUnit(),taskQueue,new NamedThreadFactory(tpp.getThreadNamePrefix()),RejectHandlerGetter.buildRejectedHandler(tpp.getRejectedHandlerType())};
}

EagerDtpExecutor则根据配置中的queueType来选择阻塞的队列

public static BlockingQueue<Runnable> buildLbq(String name, int capacity, boolean fair,int maxFreeMemory) {BlockingQueue<Runnable> blockingQueue = null;if (Objects.equals(name, ARRAY_BLOCKING_QUEUE.getName())) {blockingQueue = new ArrayBlockingQueue<>(capacity);} else if (Objects.equals(name, LINKED_BLOCKING_QUEUE.getName())) {blockingQueue = new LinkedBlockingQueue<>(capacity);} else if (Objects.equals(name, PRIORITY_BLOCKING_QUEUE.getName())) {blockingQueue = new PriorityBlockingQueue<>(capacity);} else if (Objects.equals(name, DELAY_QUEUE.getName())) {blockingQueue = new DelayQueue();} else if (Objects.equals(name, SYNCHRONOUS_QUEUE.getName())) {blockingQueue = new SynchronousQueue<>(fair);} else if (Objects.equals(name, LINKED_TRANSFER_QUEUE.getName())) {blockingQueue = new LinkedTransferQueue<>();} else if (Objects.equals(name, LINKED_BLOCKING_DEQUE.getName())) {blockingQueue = new LinkedBlockingDeque<>(capacity);} else if (Objects.equals(name, VARIABLE_LINKED_BLOCKING_QUEUE.getName())) {blockingQueue = new VariableLinkedBlockingQueue<>(capacity);} else if (Objects.equals(name, MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName())) {blockingQueue = new MemorySafeLinkedBlockingQueue<>(capacity, maxFreeMemory * M_1);}if (blockingQueue != null) {return blockingQueue;}log.error("Cannot find specified BlockingQueue {}", name);throw new DtpException("Cannot find specified BlockingQueue " + name);
}

到这里我们已经构造好了创建一个线程池需要的所有参数

调用BeanUtil#registerIfAbsent(),先判断是否同名bean,如果同名先删除后注入。

@Slf4j
public final class BeanUtil {private BeanUtil() { }public static void registerIfAbsent(BeanDefinitionRegistry registry,String beanName,Class<?> clazz,Map<String, Object> properties,Object... constructorArgs) {// 如果存在同名bean,先删除后重新注入beanif (ifPresent(registry, beanName, clazz) || registry.containsBeanDefinition(beanName)) {log.warn("DynamicTp registrar, bean definition already exists," +" overrides with remote config, beanName: {}", beanName);registry.removeBeanDefinition(beanName);}doRegister(registry, beanName, clazz, properties, constructorArgs);}/*** 注册Bean 相当于手动用 @Bean 声明线程池对象* @param registry* @param beanName* @param clazz* @param properties* @param constructorArgs*/public static void doRegister(BeanDefinitionRegistry registry,String beanName,Class<?> clazz,Map<String, Object> properties,Object... constructorArgs) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);for (Object constructorArg : constructorArgs) {builder.addConstructorArgValue(constructorArg);}if (MapUtils.isNotEmpty(properties)) {properties.forEach(builder::addPropertyValue);}// 注册 Bean registry.registerBeanDefinition(beanName, builder.getBeanDefinition());}
}

至此线程池对象已经交由IOC容器管理了。

我们的线程池对象总不能无脑塞入IOC容器就不管了吧,肯定是要留根的,也就是需要一个线程池注册表,记录有哪些线程池是受dynamic-tp托管的,这样除了可以进行统计外,也就可以实现通知报警了。

下面我们来看下项目是如何实现注册表的

2.3 注册表

DtpPostProcessor利用了Spring容器启动BeanPostProcessor机制增强机制,在bean初始化的时候调用postProcessAfterInitialization,它实现了获取被IOC容器托管的线程池bean然后注册到本地的注册表中。

代码实现如下:

@Slf4j
public class DtpPostProcessor implements BeanPostProcessor {@Overridepublic Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {// 只增强线程池相关的类if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {return bean;}// 如果是 DtpExecutor 类型注册到注册表 DTP_REGISTRYif (bean instanceof DtpExecutor) {DtpExecutor dtpExecutor = (DtpExecutor) bean;if (bean instanceof EagerDtpExecutor) {((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);}registerDtp(dtpExecutor);return dtpExecutor;}// 获取上下文ApplicationContext applicationContext = ApplicationContextHolder.getInstance();String dtpAnnotationVal;try {// 读取标注 @DynamicTp 注解的 bean 则为基本线程池,但受组件管理监控DynamicTp dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);if (Objects.nonNull(dynamicTp)) {dtpAnnotationVal = dynamicTp.value();} else {BeanDefinitionRegistry registry = (BeanDefinitionRegistry) applicationContext;AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) registry.getBeanDefinition(beanName);MethodMetadata methodMetadata = (MethodMetadata) annotatedBeanDefinition.getSource();if (Objects.isNull(methodMetadata) || !methodMetadata.isAnnotated(DynamicTp.class.getName())) {return bean;}dtpAnnotationVal = Optional.ofNullable(methodMetadata.getAnnotationAttributes(DynamicTp.class.getName())).orElse(Collections.emptyMap()).getOrDefault("value", "").toString();}} catch (NoSuchBeanDefinitionException e) {log.error("There is no bean with the given name {}", beanName, e);return bean;}// 如果说bean上面的DynamicTp注解,使用注解的值作为线程池的名称,没有的话就使用bean的名称String poolName = StringUtils.isNotBlank(dtpAnnotationVal) ? dtpAnnotationVal : beanName;if (bean instanceof ThreadPoolTaskExecutor) {// 注册到注册表 COMMON_REGISTRYThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) bean;registerCommon(poolName, taskExecutor.getThreadPoolExecutor());} else {registerCommon(poolName, (ThreadPoolExecutor) bean);}return bean;}/*** 动态线程池注册 向Map集合put元素** @param executor*/private void registerDtp(DtpExecutor executor) {DtpRegistry.registerDtp(executor, "beanPostProcessor");}/*** 非动态线程池注册 向Map集合put元素** @param poolName* @param executor*/private void registerCommon(String poolName, ThreadPoolExecutor executor) {ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);DtpRegistry.registerCommon(wrapper, "beanPostProcessor");}
}

简单总结下,和刚刚我们分析完全一致

  1. 获取到bean后,如果是非线程池类型则结束。
  2. 如果是DtpExecutor则注册到DTP_REGISTRY注册表中
  3. 如果是非动态线程池且标注了@DynamicTp注解则注册到COMMON_REGISTRY注册表中
  4. 如果是非动态线程池且未标注@DynamicTp注解则结束不做增强

DtpRegistry主要负责注册、获取、刷新某个动态线程池(刷新线程池我们会下文分析)

@Slf4j
public class DtpRegistry implements ApplicationRunner, Ordered {/*** Maintain all automatically registered and manually registered DtpExecutors.* 动态线程池 key为线程池name* DtpExecutor ThreadPoolExecutor加强版*/private static final Map<String, DtpExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();/*** Maintain all automatically registered and manually registered JUC ThreadPoolExecutors.* <p>* 标有DynamicTp注解的线程池*/private static final Map<String, ExecutorWrapper> COMMON_REGISTRY = new ConcurrentHashMap<>();private static final Equator EQUATOR = new GetterBaseEquator();/*** 配置文件映射*/private static DtpProperties dtpProperties;public static List<String> listAllDtpNames() {return Lists.newArrayList(DTP_REGISTRY.keySet());}public static List<String> listAllCommonNames() {return Lists.newArrayList(COMMON_REGISTRY.keySet());}public static void registerDtp(DtpExecutor executor, String source) {log.info("DynamicTp register dtpExecutor, source: {}, executor: {}",source, ExecutorConverter.convert(executor));DTP_REGISTRY.putIfAbsent(executor.getThreadPoolName(), executor);}public static void registerCommon(ExecutorWrapper wrapper, String source) {log.info("DynamicTp register commonExecutor, source: {}, name: {}",source, wrapper.getThreadPoolName());COMMON_REGISTRY.putIfAbsent(wrapper.getThreadPoolName(), wrapper);}public static DtpExecutor getDtpExecutor(final String name) {val executor = DTP_REGISTRY.get(name);if (Objects.isNull(executor)) {log.error("Cannot find a specified dtpExecutor, name: {}", name);throw new DtpException("Cannot find a specified dtpExecutor, name: " + name);}return executor;}public static ExecutorWrapper getCommonExecutor(final String name) {val executor = COMMON_REGISTRY.get(name);if (Objects.isNull(executor)) {log.error("Cannot find a specified commonExecutor, name: {}", name);throw new DtpException("Cannot find a specified commonExecutor, name: " + name);}return executor;}@Autowiredpublic void setDtpProperties(DtpProperties dtpProperties) {DtpRegistry.dtpProperties = dtpProperties;}@Overridepublic void run(ApplicationArguments args) {// 线程池名称Set<String> remoteExecutors = Collections.emptySet();// 获取配置文件中配置的线程池if (CollectionUtils.isNotEmpty(dtpProperties.getExecutors())) {remoteExecutors = dtpProperties.getExecutors().stream().map(ThreadPoolProperties::getThreadPoolName).collect(Collectors.toSet());}// DTP_REGISTRY中已经注册的线程池val registeredDtpExecutors = Sets.newHashSet(DTP_REGISTRY.keySet());// 找出所有线程池中没有在配置文件中配置的线程池val localDtpExecutors = CollectionUtils.subtract(registeredDtpExecutors, remoteExecutors);// 日志log.info("DtpRegistry initialization is complete, remote dtpExecutors: {}," +" local dtpExecutors: {}, local commonExecutors: {}",remoteExecutors, localDtpExecutors, COMMON_REGISTRY.keySet());}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 1;}
}

代码比较简单,这里不再说明了。

流程至此,动态线程池,标注了@DynamicTp注解的线程池,都已经准备就绪了。

你可能会问那配置刷新配置刷新动态调参是如何实现的呢,别急,我们继续分析。

2.4 配置刷新 动态调参

Dynamic-tp提供了配置刷新接口Refresher,和基类AbstractRefresher,支持不同配置中心的刷新基类,甚至完全可以自行扩展,其原理其实就是当配置中心监听到配置文件的变动后,解析配置文件,刷新配置文件,最后通过Spring ApplicationListener机制发送RefreshEvent刷新事件,由对应的Adapter来处理。

public interface Refresher {/*** Refresh with specify content.** @param content content* @param fileType file type*/void refresh(String content, ConfigFileTypeEnum fileType);
}

@Slf4j
public abstract class AbstractRefresher implements Refresher {@Resourceprotected DtpProperties dtpProperties;@Overridepublic void refresh(String content, ConfigFileTypeEnum fileType) {if (StringUtils.isBlank(content) || Objects.isNull(fileType)) {log.warn("DynamicTp refresh, empty content or null fileType.");return;}try {val configHandler = ConfigHandler.getInstance();val properties = configHandler.parseConfig(content, fileType);doRefresh(properties);} catch (IOException e) {log.error("DynamicTp refresh error, content: {}, fileType: {}", content, fileType, e);}}protected void doRefresh(Map<Object, Object> properties) {if (MapUtils.isEmpty(properties)) {log.warn("DynamicTp refresh, empty properties.");return;}// 将发生变化的属性绑定到DtpProperties对象上PropertiesBinder.bindDtpProperties(properties, dtpProperties);// 更新线程池属性doRefresh(dtpProperties);}protected void doRefresh(DtpProperties dtpProperties) {DtpRegistry.refresh(dtpProperties);publishEvent(dtpProperties);}private void publishEvent(DtpProperties dtpProperties) {RefreshEvent event = new RefreshEvent(this, dtpProperties);ApplicationContextHolder.publishEvent(event);}
}

接下来我们以Zookeeper为配置中心举例说明,代码如下。

@Slf4j
public class ZookeeperRefresher extends AbstractRefresher implements EnvironmentAware, InitializingBean {@Overridepublic void afterPropertiesSet() {final ConnectionStateListener connectionStateListener = (client, newState) -> {// 连接变更if (newState == ConnectionState.RECONNECTED) {loadAndRefresh();}};final CuratorListener curatorListener = (client, curatorEvent) -> {final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();if (null != watchedEvent) {switch (watchedEvent.getType()) {// 监听节点变更case NodeChildrenChanged:case NodeDataChanged:// 刷新loadAndRefresh();break;default:break;}}};CuratorFramework curatorFramework = CuratorUtil.getCuratorFramework(dtpProperties);String nodePath = CuratorUtil.nodePath(dtpProperties);curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);curatorFramework.getCuratorListenable().addListener(curatorListener);log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);}/*** load config and refresh*/private void loadAndRefresh() {doRefresh(CuratorUtil.genPropertiesMap(dtpProperties));}@Overridepublic void setEnvironment(Environment environment) {ConfigurableEnvironment env = ((ConfigurableEnvironment) environment);env.getPropertySources().remove(ZK_PROPERTY_SOURCE_NAME);}
}

利用了Spring机制,实现了InitializingBean并重写afterPropertiesSet,在Bean实例化完成之后会被自动调用,在这期间针对Zookeeper连接,节点变更监听器进行注册,监听连接变更和节点变更后执行刷新操作。

doRefresh(CuratorUtil.genPropertiesMap(dtpProperties));实现由基类统一处理,解析配置并绑定DtpProperties上,执行DtpRegistry#refresh()刷新后发布一个RefreshEvent事件。

protected void doRefresh(Map<Object, Object> properties) {if (MapUtils.isEmpty(properties)) {log.warn("DynamicTp refresh, empty properties.");return;}// 解析配置并绑定 DtpProperties 上PropertiesBinder.bindDtpProperties(properties, dtpProperties);// 更新线程池属性doRefresh(dtpProperties);
}protected void doRefresh(DtpProperties dtpProperties) {DtpRegistry.refresh(dtpProperties);publishEvent(dtpProperties);
}private void publishEvent(DtpProperties dtpProperties) {RefreshEvent event = new RefreshEvent(this, dtpProperties);ApplicationContextHolder.publishEvent(event);
}

来看DtpRegistry#refresh()的实现,代码如下:

public static void refresh(DtpProperties properties) {if (Objects.isNull(properties) || CollectionUtils.isEmpty(properties.getExecutors())) {log.warn("DynamicTp refresh, empty threadPoolProperties.");return;}// 属性不为空 从属性中拿到所有的线程池属性配置properties.getExecutors().forEach(x -> {if (StringUtils.isBlank(x.getThreadPoolName())) {log.warn("DynamicTp refresh, threadPoolName must not be empty.");return;}// 从 DTP_REGISTRY 线程注册池表中拿到对应的线程池对象val dtpExecutor = DTP_REGISTRY.get(x.getThreadPoolName());if (Objects.isNull(dtpExecutor)) {log.warn("DynamicTp refresh, cannot find specified dtpExecutor, name: {}.",x.getThreadPoolName());return;}// 刷新 更新线程池对象refresh(dtpExecutor, x);});
}
private static void refresh(DtpExecutor executor, ThreadPoolProperties properties) {// 参数合法校验if (properties.getCorePoolSize() < 0|| properties.getMaximumPoolSize() <= 0|| properties.getMaximumPoolSize() < properties.getCorePoolSize()|| properties.getKeepAliveTime() < 0) {log.error("DynamicTp refresh, invalid parameters exist, properties: {}", properties);return;}// 线程池旧配置DtpMainProp oldProp = ExecutorConverter.convert(executor);// 真正开始刷新doRefresh(executor, properties);// 线程池新配置DtpMainProp newProp = ExecutorConverter.convert(executor);// 相等不作处理if (oldProp.equals(newProp)) {log.warn("DynamicTp refresh, main properties of [{}] have not changed.",executor.getThreadPoolName());return;}List<FieldInfo> diffFields = EQUATOR.getDiffFields(oldProp, newProp);List<String> diffKeys = diffFields.stream().map(FieldInfo::getFieldName).collect(toList());// 线程池参数变更 平台提醒NoticeManager.doNoticeAsync(new ExecutorWrapper(executor), oldProp, diffKeys);// 更新参数 日志打印log.info("DynamicTp refresh, name: [{}], changed keys: {}, corePoolSize: [{}]," +" maxPoolSize: [{}], queueType: [{}], queueCapacity: [{}], keepAliveTime: [{}], " +"rejectedType: [{}], allowsCoreThreadTimeOut: [{}]",executor.getThreadPoolName(),diffKeys,String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.getCorePoolSize(), newProp.getCorePoolSize()),String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.getMaxPoolSize(), newProp.getMaxPoolSize()),String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.getQueueType(), newProp.getQueueType()),String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.getQueueCapacity(), newProp.getQueueCapacity()),String.format("%ss => %ss", oldProp.getKeepAliveTime(), newProp.getKeepAliveTime()),String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.getRejectType(), newProp.getRejectType()),String.format(PROPERTIES_CHANGE_SHOW_STYLE, oldProp.isAllowCoreThreadTimeOut(),newProp.isAllowCoreThreadTimeOut()));
}

总结一下上述代码无非做了这么几件事

  1. 参数合法校验
  2. 获取到线程池旧配置
  3. 执行刷新
  4. 获取到线程池新配置
  5. 如果新旧配置相同,则证明没有改动,不做处理
  6. 否则线程池变更发送通知,并记录变更日志(ps:通知相关处理下文会说,这里先跳过)

doRefresh真正执行线程池的刷新,也依靠于JUC原生线程池支持动态属性变更。

private static void doRefresh(DtpExecutor dtpExecutor, ThreadPoolProperties properties) {// 调用相应的setXXX方法更新线程池参数doRefreshPoolSize(dtpExecutor, properties);if (!Objects.equals(dtpExecutor.getKeepAliveTime(properties.getUnit()),properties.getKeepAliveTime())) {dtpExecutor.setKeepAliveTime(properties.getKeepAliveTime(), properties.getUnit());}if (!Objects.equals(dtpExecutor.allowsCoreThreadTimeOut(),properties.isAllowCoreThreadTimeOut())) {dtpExecutor.allowCoreThreadTimeOut(properties.isAllowCoreThreadTimeOut());}// update reject handlerif (!Objects.equals(dtpExecutor.getRejectHandlerName(),properties.getRejectedHandlerType())) {dtpExecutor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(properties.getRejectedHandlerType()));dtpExecutor.setRejectHandlerName(properties.getRejectedHandlerType());}// update Alias Nameif (!Objects.equals(dtpExecutor.getThreadPoolAliasName(), properties.getThreadPoolAliasName())) {dtpExecutor.setThreadPoolAliasName(properties.getThreadPoolAliasName());}updateQueueProp(properties, dtpExecutor);dtpExecutor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutdown());dtpExecutor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());dtpExecutor.setPreStartAllCoreThreads(properties.isPreStartAllCoreThreads());dtpExecutor.setRunTimeout(properties.getRunTimeout());dtpExecutor.setQueueTimeout(properties.getQueueTimeout());List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(properties.getTaskWrapperNames());dtpExecutor.setTaskWrappers(taskWrappers);// update notify itemsval allNotifyItems = mergeAllNotifyItems(properties.getNotifyItems());// 刷新通知平台NotifyHelper.refreshNotify(dtpExecutor.getThreadPoolName(), dtpProperties.getPlatforms(),dtpExecutor.getNotifyItems(), allNotifyItems);dtpExecutor.setNotifyItems(allNotifyItems);dtpExecutor.setNotifyEnabled(properties.isNotifyEnabled());
}

doRefreshPoolSize调用ThreadPoolExecutor原生setXXX方法支持在运行时动态修改

private static void doRefreshPoolSize(ThreadPoolExecutor dtpExecutor, ThreadPoolProperties properties) {// 调用ThreadPoolExecutor原生setXXX方法 支持在运行时动态修改if (properties.getMaximumPoolSize() < dtpExecutor.getMaximumPoolSize()) {if (!Objects.equals(dtpExecutor.getCorePoolSize(), properties.getCorePoolSize())) {dtpExecutor.setCorePoolSize(properties.getCorePoolSize());}if (!Objects.equals(dtpExecutor.getMaximumPoolSize(), properties.getMaximumPoolSize())) {dtpExecutor.setMaximumPoolSize(properties.getMaximumPoolSize());}return;}if (!Objects.equals(dtpExecutor.getMaximumPoolSize(), properties.getMaximumPoolSize())) {dtpExecutor.setMaximumPoolSize(properties.getMaximumPoolSize());}if (!Objects.equals(dtpExecutor.getCorePoolSize(), properties.getCorePoolSize())) {dtpExecutor.setCorePoolSize(properties.getCorePoolSize());}
}

updateQueueProp更新线程池阻塞队列大小

private static void updateQueueProp(ThreadPoolProperties properties, DtpExecutor dtpExecutor) {// queueType 非 VariableLinkedBlockingQueue MemorySafeLinkedBlockingQueue// 且executorType为EagerDtpExecutor 不刷新if (!canModifyQueueProp(properties)) {return;}// 获取到线程池原来的队列val blockingQueue = dtpExecutor.getQueue();// 如果原来的队列容量和现在的不一样if (!Objects.equals(dtpExecutor.getQueueCapacity(), properties.getQueueCapacity())) {// 并且原来的队列是 VariableLinkedBlockingQueue 类型的,那么就设置队列的容量if (blockingQueue instanceof VariableLinkedBlockingQueue) {((VariableLinkedBlockingQueue<Runnable>) blockingQueue).setCapacity(properties.getQueueCapacity());} else {// 否则不设置log.error("DynamicTp refresh, the blockingqueue capacity cannot be reset, dtpName: {}," +" queueType {}", dtpExecutor.getThreadPoolName(), dtpExecutor.getQueueName());}}// 如果队列是 MemorySafeLinkedBlockingQueue,那么设置最大内存if (blockingQueue instanceof MemorySafeLinkedBlockingQueue) {((MemorySafeLinkedBlockingQueue<Runnable>) blockingQueue).setMaxFreeMemory(properties.getMaxFreeMemory() * M_1);}
}

上述代码提到了几个眼生的队列,他们都是dynamic-tp自行实现的阻塞队列,我们来看下

VariableLinkedBlockingQueue: 可以设置队列容量,且支持变更队列容量
MemorySafeLinkedBlockingQueue: 继承VariableLinkedBlockingQueue,可以通过maxFreeMemory设置队列容量,在构造器中对容量有默认的大小限制

首先我们思考一下,为什么dynamic-tp要自行实现的阻塞队列?

当你翻看Java原生LinkedBlockingQueue队列时你就会发现,队列容量被定义为private final类型的,不能修改,那肯定是不符合我们修改阻塞队列大小还能实现刷新线程池的效果。

其中着重说明下MemorySafeLinkedBlockingQueue队列,LinkedBlockingQueue的容量默认是Integer.MAX_VALUE,所以当我们不对其进行限制时,就有可能导致OOM问题,所以MemorySafeLinkedBlockingQueue构造函数设置了默认队列大小

当我们往队列添加元素的时候,会先判断有没有足够的空间

public class MemorySafeLinkedBlockingQueue<E> extends VariableLinkedBlockingQueue<E> {private static final long serialVersionUID = 8032578371739960142L;public static final int THE_256_MB = 256 * 1024 * 1024;/*** 队列的容量*/private int maxFreeMemory;public MemorySafeLinkedBlockingQueue() {// 默认256MBthis(THE_256_MB);}public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {super(Integer.MAX_VALUE);this.maxFreeMemory = maxFreeMemory;}public MemorySafeLinkedBlockingQueue(final int capacity, final int maxFreeMemory) {super(capacity);this.maxFreeMemory = maxFreeMemory;}public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c, final int maxFreeMemory) {super(c);this.maxFreeMemory = maxFreeMemory;}/*** set the max free memory.** @param maxFreeMemory the max free memory*/public void setMaxFreeMemory(final int maxFreeMemory) {this.maxFreeMemory = maxFreeMemory;}/*** get the max free memory.** @return the max free memory limit*/public int getMaxFreeMemory() {return maxFreeMemory;}/*** determine if there is any remaining free memory.** @return true if has free memory*/public boolean hasRemainedMemory() {if (MemoryLimitCalculator.maxAvailable() > maxFreeMemory) {return true;}throw new RejectedExecutionException("No more memory can be used.");}@Overridepublic void put(final E e) throws InterruptedException {// 我们往队列添加元素的时候,会先判断有没有足够的空间if (hasRemainedMemory()) {super.put(e);}}@Overridepublic boolean offer(final E e, final long timeout, final TimeUnit unit)throws InterruptedException {return hasRemainedMemory() && super.offer(e, timeout, unit);}@Overridepublic boolean offer(final E e) {return hasRemainedMemory() && super.offer(e);}
}

回到上文我们说刷新完线程池后,发送异步事件RefreshEvent,来继续看下

DtpAdapterListener处于adapter模块,该模块主要是对些三方组件中的线程池进行管理(例如TomcatJetty等),通过spring的事件发布监听机制来实现与核心流程解耦

@Slf4j
public class DtpAdapterListener implements GenericApplicationListener {@Overridepublic boolean supportsEventType(ResolvableType resolvableType) {Class<?> type = resolvableType.getRawClass();if (type != null) {return RefreshEvent.class.isAssignableFrom(type)|| CollectEvent.class.isAssignableFrom(type)|| AlarmCheckEvent.class.isAssignableFrom(type);}return false;}@Overridepublic void onApplicationEvent(@NonNull ApplicationEvent event) {try {if (event instanceof RefreshEvent) {doRefresh(((RefreshEvent) event).getDtpProperties());} else if (event instanceof CollectEvent) {doCollect(((CollectEvent) event).getDtpProperties());} else if (event instanceof AlarmCheckEvent) {doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());}} catch (Exception e) {log.error("DynamicTp adapter, event handle failed.", e);}}}
/*** Do refresh.** @param dtpProperties dtpProperties*/
protected void doRefresh(DtpProperties dtpProperties) {val handlerMap = ApplicationContextHolder.getBeansOfType(DtpAdapter.class);if (CollectionUtils.isEmpty(handlerMap)) {return;}handlerMap.forEach((k, v) -> v.refresh(dtpProperties));
}

2.3 线程池类型

DtpLifecycleSupport

DtpLifecycleSupport继承了JUC ThreadPoolExecutor,对原生线程池进行了增强

@Slf4j
public abstract class DtpLifecycleSupport extends ThreadPoolExecutorimplements InitializingBean, DisposableBean {protected String threadPoolName;/*** Whether to wait for scheduled tasks to complete on shutdown,* not interrupting running tasks and executing all tasks in the queue.* <p>* 在关闭线程池的时候是否等待任务执行完毕,不会打断运行中的任务,并且会执行队列中的所有任务*/protected boolean waitForTasksToCompleteOnShutdown = false;/*** The maximum number of seconds that this executor is supposed to block* on shutdown in order to wait for remaining tasks to complete their execution* before the rest of the container continues to shut down.* <p>* 在线程池关闭时等待的最大时间,目的就是等待线程池中的任务运行完毕。*/protected int awaitTerminationSeconds = 0;public DtpLifecycleSupport(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}@Overridepublic void afterPropertiesSet() {DtpProperties dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class);// 子类实现initialize(dtpProperties);}
}

提供了两个增强字段waitForTasksToCompleteOnShutdownawaitTerminationSeconds

我们以此来看下

waitForTasksToCompleteOnShutdown作用在线程池销毁阶段

public void internalShutdown() {if (log.isInfoEnabled()) {log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);}// 如果需要等待任务执行完毕,则调用 shutdown()会执行先前已提交的任务,拒绝新任务提交,线程池状态变成 SHUTDOWNif (this.waitForTasksToCompleteOnShutdown) {this.shutdown();} else {// 如果不需要等待任务执行完毕,则直接调用shutdownNow()方法,尝试中断正在执行的任务,// 返回所有未执行的任务,线程池状态变成 STOP, 然后调用 Future 的 cancel 方法取消for (Runnable remainingTask : this.shutdownNow()) {cancelRemainingTask(remainingTask);}}awaitTerminationIfNecessary();
}

总结下它的作用就是在关闭线程池的时候看是否等待任务执行完毕,如果需要等待则会拒绝新任务的提交,执行先前已提交的任务,否则中断正在执行的任务。

awaitTerminationSeconds字段主要是配合shutdown使用,阻塞当前线程,等待已提交的任务执行完毕或者超时的最大时间,等待线程池中的任务运行结束。

DtpExecutor

DtpExecutor也就是我们项目中横贯整个流程的动态线程池,它继承自DtpLifecycleSupport,主要是也是实现对基本线程池的增强。

@Slf4j
public class DtpExecutor extends DtpLifecycleSupport implements SpringExecutor {/*** Simple Business alias Name of Dynamic ThreadPool. Use for notify.*/private String threadPoolAliasName;/*** RejectHandler name.*/private String rejectHandlerName;/*** If enable notify.*/private boolean notifyEnabled;/*** Notify items, see {@link NotifyItemEnum}.* <p>* 需要提醒的平台*/private List<NotifyItem> notifyItems;/*** Task wrappers, do sth enhanced.*/private List<TaskWrapper> taskWrappers = Lists.newArrayList();/*** If pre start all core threads.* <p>* 线程是否需要提前预热,真正调用的还是ThreadPoolExecutor的对应方法*/private boolean preStartAllCoreThreads;/*** Task execute timeout, unit (ms), just for statistics.*/private long runTimeout;/*** Task queue wait timeout, unit (ms), just for statistics.*/private long queueTimeout;/*** Total reject count.*/private final LongAdder rejectCount = new LongAdder();/*** Count run timeout tasks.*/private final LongAdder runTimeoutCount = new LongAdder();/*** Count queue wait timeout tasks.*/private final LongAdder queueTimeoutCount = new LongAdder();public DtpExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);this.rejectHandlerName = handler.getClass().getSimpleName();setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler));}@Overridepublic void execute(Runnable task, long startTimeout) {execute(task);}/*** 增强方法** @param command the runnable task*/@Overridepublic void execute(Runnable command) {String taskName = null;if (command instanceof NamedRunnable) {taskName = ((NamedRunnable) command).getName();}if (CollectionUtils.isNotEmpty(taskWrappers)) {for (TaskWrapper t : taskWrappers) {command = t.wrap(command);}}if (runTimeout > 0 || queueTimeout > 0) {command = new DtpRunnable(command, taskName);}super.execute(command);}/*** 增强方法** @param t the thread that will run task {@code r}* @param r the task that will be executed*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {if (!(r instanceof DtpRunnable)) {super.beforeExecute(t, r);return;}DtpRunnable runnable = (DtpRunnable) r;long currTime = TimeUtil.currentTimeMillis();if (runTimeout > 0) {runnable.setStartTime(currTime);}if (queueTimeout > 0) {long waitTime = currTime - runnable.getSubmitTime();if (waitTime > queueTimeout) {queueTimeoutCount.increment();AlarmManager.doAlarmAsync(this, QUEUE_TIMEOUT);if (StringUtils.isNotBlank(runnable.getTaskName())) {log.warn("DynamicTp execute, queue timeout, poolName: {}, " +"taskName: {}, waitTime: {}ms", this.getThreadPoolName(),runnable.getTaskName(), waitTime);}}}super.beforeExecute(t, r);}/*** 增强方法** @param r the runnable that has completed* @param t the exception that caused termination, or null if*          execution completed normally*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {if (runTimeout > 0) {DtpRunnable runnable = (DtpRunnable) r;long runTime = TimeUtil.currentTimeMillis() - runnable.getStartTime();if (runTime > runTimeout) {runTimeoutCount.increment();AlarmManager.doAlarmAsync(this, RUN_TIMEOUT);if (StringUtils.isNotBlank(runnable.getTaskName())) {log.warn("DynamicTp execute, run timeout, poolName: {}," +" taskName: {}, runTime: {}ms", this.getThreadPoolName(), runnable.getTaskName(), runTime);}}}super.afterExecute(r, t);}@Overrideprotected void initialize(DtpProperties dtpProperties) {NotifyHelper.initNotify(this, dtpProperties.getPlatforms());if (preStartAllCoreThreads) {// 在没有任务到来之前就创建corePoolSize个线程或一个线程 因为在默认线程池启动的时候是不会启动核心线程的,// 只有来了新的任务时才会启动线程prestartAllCoreThreads();}}
}

EagerDtpExecutor

EagerDtpExecutor继承了DtpExecutor,专为IO密集场景提供,为什么这么说呢,请看下文分析

public class EagerDtpExecutor extends DtpExecutor {/*** The number of tasks submitted but not yet finished.* 已经提交的但还没有完成的任务数量*/private final AtomicInteger submittedTaskCount = new AtomicInteger(0);public EagerDtpExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}public int getSubmittedTaskCount() {return submittedTaskCount.get();}@Overrideprotected void afterExecute(Runnable r, Throwable t) {submittedTaskCount.decrementAndGet();super.afterExecute(r, t);}@Overridepublic void execute(Runnable command) {if (command == null) {throw new NullPointerException();}submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {// 被拒绝时if (getQueue() instanceof TaskQueue) {// If the Executor is close to maximum pool size, concurrent// calls to execute() may result (due to use of TaskQueue) in// some tasks being rejected rather than queued.// If this happens, add them to the queue.final TaskQueue queue = (TaskQueue) getQueue();try {// 加入队列中if (!queue.force(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedTaskCount.decrementAndGet();throw rx;}}}
}

来看execute执行方法,当捕获住拒绝异常时,说明线程池队列已满且大于最大线程数,如果当前队列是

TaskQueue则重新将拒绝任务加入队列中,加入失败则抛出任务拒绝异常。

来看TaskQueue代码实现

public class TaskQueue extends VariableLinkedBlockingQueue<Runnable> {private static final long serialVersionUID = -1L;private transient EagerDtpExecutor executor;public TaskQueue(int queueCapacity) {super(queueCapacity);}public void setExecutor(EagerDtpExecutor exec) {executor = exec;}@Overridepublic boolean offer(@NonNull Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor.");}int currentPoolThreadSize = executor.getPoolSize();// 线程池中的线程数等于最大线程数的时候,就将任务放进队列等待工作线程处理if (currentPoolThreadSize == executor.getMaximumPoolSize()) {return super.offer(runnable);}// 如果当前未执行的任务数量小于等于当前线程数,还有剩余的worker线程,就将任务放进队列等待工作线程处理if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {return super.offer(runnable);}// 如果当前线程数大于核心线程,但小于最大线程数量,则直接返回false,外层逻辑线程池创建新的线程来执行任务if (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}// currentPoolThreadSize >= maxreturn super.offer(runnable);}
}

上述代码我们看到currentPoolThreadSize < executor.getMaximumPoolSize()会返回false

底层实现还是JUCThreadPoolExecutor,来看execute方法,当前线程数大于核心线程,但小于最大线程数量,则执行addWorker(command, false),创建新的线程来执行任务。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 线程池状态和线程数的整数int c = ctl.get();// 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了 结果会包装到 FutureTask 中if (addWorker(command, true)) return;c = ctl.get();}// 要么当前线程数大于等于核心线程数,要么刚刚addWorker失败了,如果线程池处于RUNNING状态,// 把这个任务添加到任务队列 workQueue 中if (isRunning(c) && workQueue.offer(command)) {// 二次状态检查int recheck = ctl.get(); // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略if (! isRunning(recheck) && remove(command)) reject(command);// 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了else if (workerCountOf(recheck) == 0) // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行addWorker(null, false);}// workQueue.offer(command)返回false,以 maximumPoolSize 为界创建新的 worker线程并启动线程,// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略else if (!addWorker(command, false)) reject(command);
}

一看这不就是Tomcat线程池处理流程吗,对比于原生JUC线程池提交任务流程

看下原生JUC线程池提交任务的流程

  • 当前线程数小于核心线程数,则创建一个新的线程来执行任务
  • 当前线程数大于等于核心线程数,且阻塞队列未满,则将任务添加到队列中
  • 如果阻塞队列已满,当前线程数大于等于核心线程数,当前线程数小于最大线程数,则创建并启动一个线程来执行新提交的任务
  • 若当前线程数大于等于最大线程数,且阻塞队列已满,此时会执行拒绝策略

来看下原生JUC线程池提交流程,引用美团线程池篇中的图

原生JUC线程池核心思想就是就是先让核心线程数的线程工作,多余的任务统统塞到阻塞队列,阻塞队列塞不下才再多创建线程来工作,这种情况下当大量请求提交时,大量的请求很有可能都会被阻塞在队列中,而线程还没有创建到最大线程数,导致用户请求处理很慢用户体验很差。

那如何解决呢?

重写了execute()方法,当抛出拒绝策略了尝试一次往阻塞队列里插入任务,尽最大努力的去执行任务,新增阻塞队列继承了LinkedBlockingQueue,重写了offer()方法,重写了offer()方法,每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败。进而让线程池创建新线程来处理任务。

如下图所示:

总结:知识是相通的,要学以致用

2.4 报警通知

关于分析报警通知,可以从AlarmManagerNoticeManager这两个类入手,实际就是分别构造了一个报警通知责任链,在需要报警通知的时候,调用责任链执行。

先来看AlarmManager的代码实现

@Slf4j
public class AlarmManager {private static final ExecutorService ALARM_EXECUTOR = ThreadPoolBuilder.newBuilder().threadPoolName("dtp-alarm").threadFactory("dtp-alarm").corePoolSize(2).maximumPoolSize(4).workQueue(LINKED_BLOCKING_QUEUE.getName(), 2000, false, null).rejectedExecutionHandler(RejectedTypeEnum.DISCARD_OLDEST_POLICY.getName()).buildCommon();private static final InvokerChain<BaseNotifyCtx> ALARM_INVOKER_CHAIN;static {// 构造责任链ALARM_INVOKER_CHAIN = NotifyFilterBuilder.getAlarmInvokerChain();}private AlarmManager() {}}

责任链的构造

public class NotifyFilterBuilder {private NotifyFilterBuilder() { }public static InvokerChain<BaseNotifyCtx> getAlarmInvokerChain() {val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);Collection<NotifyFilter> alarmFilters = Lists.newArrayList(filters.values());alarmFilters.add(new AlarmBaseFilter());alarmFilters = alarmFilters.stream().filter(x -> x.supports(NotifyTypeEnum.ALARM)).sorted(Comparator.comparing(Filter::getOrder)).collect(Collectors.toList());// 构造ALARM_FILTER_CHAIN链return InvokerChainFactory.buildInvokerChain(new AlarmInvoker(),alarmFilters.toArray(new NotifyFilter[0]));}public static InvokerChain<BaseNotifyCtx> getCommonInvokerChain() {val filters = ApplicationContextHolder.getBeansOfType(NotifyFilter.class);Collection<NotifyFilter> noticeFilters = Lists.newArrayList(filters.values());noticeFilters.add(new NoticeBaseFilter());noticeFilters = noticeFilters.stream().filter(x -> x.supports(NotifyTypeEnum.COMMON)).sorted(Comparator.comparing(Filter::getOrder)).collect(Collectors.toList());return InvokerChainFactory.buildInvokerChain(new NoticeInvoker(),noticeFilters.toArray(new NotifyFilter[0]));}
}
public final class InvokerChainFactory {private InvokerChainFactory() { }@SafeVarargspublic static<T> InvokerChain<T> buildInvokerChain(Invoker<T> target, Filter<T>... filters) {InvokerChain<T> invokerChain = new InvokerChain<>();Invoker<T> last = target;for (int i = filters.length - 1; i >= 0; i--) {Invoker<T> next = last;Filter<T> filter = filters[i];last = context -> filter.doFilter(context, next);}invokerChain.setHead(last);return invokerChain;}
}

执行报警方法调用如下

public static void doAlarm(ExecutorWrapper executorWrapper, NotifyItemEnum notifyItemEnum) {// 根据告警类型获取告警项配置,一个线程池可以配置多个NotifyItem,这里需要过滤NotifyHelper.getNotifyItem(executorWrapper, notifyItemEnum).ifPresent(notifyItem -> {// 执行责任链val alarmCtx = new AlarmCtx(executorWrapper, notifyItem);ALARM_INVOKER_CHAIN.proceed(alarmCtx);});
}

执行责任链,真正执行报警通知的代码如下

public class AlarmInvoker implements Invoker<BaseNotifyCtx> {@Overridepublic void invoke(BaseNotifyCtx context) {val alarmCtx = (AlarmCtx) context;val executorWrapper = alarmCtx.getExecutorWrapper();val notifyItem = alarmCtx.getNotifyItem();val alarmInfo = AlarmCounter.getAlarmInfo(executorWrapper.getThreadPoolName(),notifyItem.getType());alarmCtx.setAlarmInfo(alarmInfo);DtpNotifyCtxHolder.set(context);// 真正的发送告警的逻辑NotifierHandler.getInstance().sendAlarm(NotifyItemEnum.of(notifyItem.getType()));AlarmCounter.reset(executorWrapper.getThreadPoolName(), notifyItem.getType());}
}

调用NotifierHandler#sendAlarm()

@Slf4j
public final class NotifierHandler {private static final Map<String, DtpNotifier> NOTIFIERS = new HashMap<>();private NotifierHandler() {ServiceLoader<DtpNotifier> loader = ServiceLoader.load(DtpNotifier.class);for (DtpNotifier notifier : loader) {NOTIFIERS.put(notifier.platform(), notifier);}DtpNotifier dingNotifier = new DtpDingNotifier(new DingNotifier());DtpNotifier wechatNotifier = new DtpWechatNotifier(new WechatNotifier());DtpNotifier larkNotifier = new DtpLarkNotifier(new LarkNotifier());NOTIFIERS.put(dingNotifier.platform(), dingNotifier);NOTIFIERS.put(wechatNotifier.platform(), wechatNotifier);NOTIFIERS.put(larkNotifier.platform(), larkNotifier);}public void sendAlarm(NotifyItemEnum notifyItemEnum) {try {NotifyItem notifyItem = DtpNotifyCtxHolder.get().getNotifyItem();for (String platform : notifyItem.getPlatforms()) {DtpNotifier notifier = NOTIFIERS.get(platform.toLowerCase());if (notifier != null) {notifier.sendAlarmMsg(notifyItemEnum);}}} finally {DtpNotifyCtxHolder.remove();}}
}

最后调用notifier.sendAlarmMsg(notifyItemEnum)发送消息

@Override
public void sendAlarmMsg(NotifyItemEnum notifyItemEnum) {NotifyHelper.getPlatform(platform()).ifPresent(platform -> {// 构建报警信息String content = buildAlarmContent(platform, notifyItemEnum);if (StringUtils.isBlank(content)) {return;}// 发送notifier.send(platform, content);});
}

2.5 监控

入口DtpMonitor

@Slf4j
public class DtpMonitor implements ApplicationRunner, Ordered {private static final ScheduledExecutorService MONITOR_EXECUTOR = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("dtp-monitor", true));@Resourceprivate DtpProperties dtpProperties;/*** 每隔 monitorInterval(默认为5) 执行监控** @param args*/@Overridepublic void run(ApplicationArguments args) {MONITOR_EXECUTOR.scheduleWithFixedDelay(this::run,0, dtpProperties.getMonitorInterval(), TimeUnit.SECONDS);}private void run() {// 所有线程池的名称List<String> dtpNames = DtpRegistry.listAllDtpNames();// 所有标有DynamicTp注解的线程池List<String> commonNames = DtpRegistry.listAllCommonNames();// 检查告警checkAlarm(dtpNames);// 指标收集collect(dtpNames, commonNames);}private void collect(List<String> dtpNames, List<String> commonNames) {// 不收集指标if (!dtpProperties.isEnabledCollect()) {return;}// 拿到所有的线程池对象,获取到线程池的各种属性统计指标dtpNames.forEach(x -> {DtpExecutor executor = DtpRegistry.getDtpExecutor(x);ThreadPoolStats poolStats = MetricsConverter.convert(executor);// 指标收集doCollect(poolStats);});commonNames.forEach(x -> {ExecutorWrapper wrapper = DtpRegistry.getCommonExecutor(x);// 转换 ThreadPoolStatsThreadPoolStats poolStats = MetricsConverter.convert(wrapper);// 指标收集doCollect(poolStats);});// 发送一个CollectEvent事件publishCollectEvent();}/*** 针对每一个线程池,使用其名称从注册表中获取到线程池对象,然后触发告警** @param dtpNames*/private void checkAlarm(List<String> dtpNames) {dtpNames.forEach(x -> {DtpExecutor executor = DtpRegistry.getDtpExecutor(x);AlarmManager.doAlarmAsync(executor, SCHEDULE_NOTIFY_ITEMS);});// 发送告警AlarmCheckEvent事件publishAlarmCheckEvent();}private void doCollect(ThreadPoolStats threadPoolStats) {try {CollectorHandler.getInstance().collect(threadPoolStats, dtpProperties.getCollectorTypes());} catch (Exception e) {log.error("DynamicTp monitor, metrics collect error.", e);}}private void publishCollectEvent() {CollectEvent event = new CollectEvent(this, dtpProperties);ApplicationContextHolder.publishEvent(event);}private void publishAlarmCheckEvent() {AlarmCheckEvent event = new AlarmCheckEvent(this, dtpProperties);ApplicationContextHolder.publishEvent(event);}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 2;}
}

代码比较易懂,这里就不在叙述了。

三、总结

dynamic-tp设计巧妙,代码中设计模式先行,结构清晰易懂,代码规整,同时提供了很多扩展点,通过利用了Spring的扩展,和JUC原生线程池优势,功能强大。

参考文章

  • dynamic-tp官网
  • 手写精简版动态线程池
  • 基于Nacos的简单动态化线程池实现
  • Nacos配置中心实现一个动态线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/390931.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++中lambda使用mutable关键字详解

C中lambda使用mutable关键字详解 在《C初学者指南-5.标准库(第二部分)–更改元素算法》中&#xff0c;讲“generate”算法时有下面这段代码&#xff1a; auto gen [i0]() mutable { i 2; return i; }; std::vector<int> v; v.resize(7,0); generate(begin(v)1, begin…

C++ STL在算法题中的常用语法

Vector 1.将vector<int>中的元素全部置换为0 fill(vec.begin(), vec.end(), 0); 2.vector容器是可以直接用比较是否值等的&#xff01; Unordered_set 1. unordered_set的删除&#xff08;count的值也会减少&#xff09; 2.unordered_map中的int默认值是0&#xff0c;…

在Jira中使用AI

Jira已经可以使用AI功能了。 如果您使用的是Jira Cloud&#xff0c;您需要请管理员在管理页面中打开AI功能开关。&#xff08;AI功能在Standard版中未提供&#xff0c;请使用Premium或更高级的版本&#xff09;如果您使用的是自己部署的Jira Data Center&#xff0c;您需要请管…

算法学习day28

一、寻找右区间(二分法) 题意&#xff1a;题目很容易理解 但是转换为二分法有点晦涩 给你一个区间数组 intervals &#xff0c;其中 intervals[i] [starti, endi] &#xff0c;且每个 starti 都 不同 。区间 i 的 右侧区间 可以记作区间 j &#xff0c;并满足 startj > e…

OpenCV||超详细的灰度变换和直方图修正

一、点运算 概念&#xff1a;点运算&#xff08;也称为像素级运算或单像素操作&#xff09;是指对图像中每一个像素点进行独立、相同的操作&#xff0c;而这些操作不会考虑像素点之间的空间关系。点处理优势也称对比度拉伸、对比度增强或灰度变换等。 目的&#xff1a;点运算…

【EtherCAT】Windows+Visual Studio配置SOEM主站——静态库配置+部署

目录 一、准备工作 1. Visual Studio 2022 2. Npcap 1.79 3. SOEM源码 二、静态库配置 1. 修改SOEM源码配置 2. 编译SOEM源码 3. 测试 三、静态库部署 1. 新建Visual Studio工程 2. 创建文件夹 3. 创建主函数 4. 复制静态库 5. 复制头文件 6. 配置头文件…

链接、装载和库——1 简介

前言 关于个人的读书笔记 第一章 温故而知新 1.1 从hello&#xff0c;world说起 ​计算机在执行hello&#xff0c;world的时候发生了什么&#xff1f; 1.2 万变不离其宗 ​在计算机多如牛毛的硬件设备中。有三个部件最为关键&#xff0c;它们分别是 CPU、内存和 I/O 控制芯…

一次多波束和浅地层处理的经历—信标机出问题?

最近处理多波束和浅地层时&#xff0c;一个从来没有过的问题出现了。 多波束数据(.pds)是由PDS2000采集的&#xff0c;使用设备型号为T50P。浅地层数据(.raw)是有SESWIN采集的&#xff0c;使用设备型号为SES2000 Standard。 1、多波束处理 多波束数据采用CARIS11.3处理的。船…

开源LivePortrait,快速实现表情包自定义

最近可灵AI很火&#xff0c;看到网上生成的效果也很赞啊&#xff0c;之前发现快手可灵开源了LivePortrait&#xff0c;今天去玩了一下&#xff0c;很有意思。 比如下图官方展示效果&#xff1a; 这些图片开始自带表情了&#xff0c;主要就是通过LivePortrait来实现。 LivePor…

[E二叉树] lc572. 另一棵树的子树(dfs+前中序判断+树哈希+树上KMP+好题)

文章目录 1. 题目来源2. 题目解析 1. 题目来源 链接&#xff1a;572. 另一棵树的子树 2. 题目解析 看到这个题目就感觉不简单&#xff0c;因为写了写 dfs 版本的&#xff0c;发现好像不太会… 还是简单粗暴一点&#xff0c;直接搞一个 前序中序&#xff0c;进行判断即可。我…

【PyTorch】神经风格迁移项目

神经风格迁移中&#xff0c;取一个内容图像和一个风格图像&#xff0c;综合内容图像的内容和风格图像的艺术风格生成新的图像。 目录 准备数据 处理数据 神经风格迁移模型 加载预训练模型 定义损失函数 定义优化器 运行模型 准备数据 创建data文件夹&#xff0c;放入…

数据恢复软件:电脑丢失文件,及时使用数据恢复软件恢复!

数据恢复软件什么时候会用到&#xff1f; 答&#xff1a;如果真的不小心删除文件&#xff0c;清空回收站&#xff0c;电脑重装系统等情况发生&#xff0c;我们要懂的及时停止使用电子设备&#xff0c;使用可靠的数据恢复软件&#xff0c;帮助我们恢复这些电子设备的数据&#…

二进制搭建 Kubernetes v1.20(上)

目录 一、操作系统初始化配置 二、升级Liunx内核 三、部署docker引擎 四、部署etcd集群 五、部署Master组件 六、部署Worker Node组件 hostnameip需要部署k8s集群master0120.0.0.100kube-apiserver kube-controller-manager kube-scheduler etcdk8s集群master0220.0.0.1…

CookieMaker工作室合作开发C++项目十一:拟态病毒

&#xff08;注&#xff1a;本文章使用了“无标题技术”&#xff09; 一天&#xff0c;我和几个同事&#xff0c;平台出了点BUG&#xff0c;居然给我刷出了千年杀&#xff0c;同事看得瑕疵欲裂&#xff0c;发誓要将我挫骨扬灰—— &#xff08;游戏入口&#xff1a;和平精英31.…

【数据脱敏】数据交换平台数据脱敏建设方案

1 概述 1.1 数据脱敏定义 1.2 数据脱敏原则 1.2.1基本原则 1.2.2技术原则 1.2.3管理原则 1.3 数据脱敏常用方法 3.1.1泛化技术 3.1.2抑制技术 3.1.3扰乱技术 3.1.4有损技术 1.4 数据脱敏全生命周期 2 制定数据脱敏规程 3 发现敏感数据 4 定义脱敏规则 5 执…

[Unity] ShaderGraph实现DeBuff污染 溶解叠加效果

本篇是在之前的基础上&#xff0c;继续做的功能衍生。 [Unity] ShaderGraph实现Sprite消散及受击变色 完整连连看如下所示&#xff1a;

TypeError: ‘float’ object is not iterable 深度解析

TypeError: ‘float’ object is not iterable 深度解析与实战指南 在Python编程中&#xff0c;TypeError: float object is not iterable是一个常见的错误&#xff0c;通常发生在尝试对浮点数&#xff08;float&#xff09;进行迭代操作时。这个错误表明代码中存在类型使用不…

C基础项目(学生成绩管理系统)

目录 一、项目要求 二、完整代码实例 三、分文件编写代码实例 一、项目要求 1.系统运行&#xff0c;打开如下界面。列出系统帮助菜单&#xff08;即命令菜单&#xff09;&#xff0c;提示输入命令 2.开始时还没有录入成绩&#xff0c;所以输入命令 L 也无法列出成绩。应提…

嵌入式Linux系统中pinictrl框架基本实现

1. 回顾Pinctrl的三大作用 记住pinctrl的三大作用,有助于理解所涉及的数据结构: * 引脚枚举与命名(Enumerating and naming) * 单个引脚 * 各组引脚 * 引脚复用(Multiplexing):比如用作GPIO、I2C或其他功能 * 引脚配置(Configuration):比如上拉、下拉、open drain、驱…

从零入门 AI for Science(AI+药物) 笔记 #Datawhale AI 夏令营

&#x1f496;使用平台 我的Notebook 魔搭社区 https://modelscope.cn/my/mynotebook/preset . 魔搭高峰期打不开Task3又换回飞桨了 吧torch 架构换成了 飞桨的paddle 飞桨AI Studio星河社区-人工智能学习与实训社区 https://aistudio.baidu.com/projectdetail/8191835?cont…