Flink 源码 TaskManagerRunner 启动 Akka Actor System 源码

TaskManagerRunner 启动 Akka Actor System 源码关键流程

private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem = RpcSystem.load(configuration);this.executor =Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("taskmanager-future"));highAvailabilityServices =HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,AddressResolution.NO_ADDRESS_RESOLUTION,rpcSystem,this);JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);// ...
}// TaskManagerRunner
static RpcService createRpcService(final Configuration configuration,final HighAvailabilityServices haServices,final RpcSystem rpcSystem)throws Exception {checkNotNull(configuration);checkNotNull(haServices);
return RpcUtils.createRemoteRpcService(rpcSystem,configuration,determineTaskManagerBindAddress(configuration, haServices, rpcSystem),//...);}// RpcUtils
public static RpcService createRemoteRpcService(RpcSystem rpcSystem,Configuration configuration,@Nullable String externalAddress,String externalPortRange,@Nullable String bindAddress,@SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)throws Exception {RpcSystem.RpcServiceBuilder rpcServiceBuilder =rpcSystem.remoteServiceBuilder(configuration, externalAddress, externalPortRange);if (bindAddress != null) {rpcServiceBuilder = rpcServiceBuilder.withBindAddress(bindAddress);}if (bindPort.isPresent()) {rpcServiceBuilder = rpcServiceBuilder.withBindPort(bindPort.get());}return rpcServiceBuilder.createAndStart();
}// AkkaRpcServiceUtils
public AkkaRpcService createAndStart(TriFunction<ActorSystem, AkkaRpcServiceConfiguration, ClassLoader, AkkaRpcService>constructor)throws Exception {if (actorSystemExecutorConfiguration == null) {actorSystemExecutorConfiguration =AkkaUtils.getForkJoinExecutorConfig(AkkaBootstrapTools.getForkJoinExecutorConfiguration(configuration));}final ActorSystem actorSystem;// akka internally caches the context class loader// make sure it uses the plugin class loadertry (TemporaryClassLoaderContext ignored =TemporaryClassLoaderContext.of(getClass().getClassLoader())) {if (externalAddress == null) {// create local actor systemactorSystem =AkkaBootstrapTools.startLocalActorSystem(configuration,actorSystemName,logger,actorSystemExecutorConfiguration,customConfig);} else {// create remote actor systemactorSystem =AkkaBootstrapTools.startRemoteActorSystem(configuration,actorSystemName,externalAddress,externalPortRange,bindAddress,Optional.ofNullable(bindPort),logger,actorSystemExecutorConfiguration,customConfig);}}return constructor.apply(actorSystem,AkkaRpcServiceConfiguration.fromConfiguration(configuration),RpcService.class.getClassLoader());}
}// AkkaBootstrapTools
private static ActorSystem startRemoteActorSystem(Configuration configuration,String actorSystemName,String externalAddress,int externalPort,String bindAddress,int bindPort,Logger logger,Config actorSystemExecutorConfiguration,Config customConfig)throws Exception {String externalHostPortUrl =NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);String bindHostPortUrl =NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);logger.info("Trying to start actor system, external address {}, bind address {}.",externalHostPortUrl,bindHostPortUrl);try {Config akkaConfig =AkkaUtils.getAkkaConfig(configuration,new HostAndPort(externalAddress, externalPort),new HostAndPort(bindAddress, bindPort),actorSystemExecutorConfiguration);if (customConfig != null) {akkaConfig = customConfig.withFallback(akkaConfig);}return startActorSystem(akkaConfig, actorSystemName, logger);} catch (Throwable t) {if (t instanceof ChannelException) {Throwable cause = t.getCause();if (cause != null && t.getCause() instanceof BindException) {throw new IOException("Unable to create ActorSystem at address "+ bindHostPortUrl+ " : "+ cause.getMessage(),t);}}throw new Exception("Could not create actor system", t);}
}private static ActorSystem startActorSystem(Config akkaConfig, String actorSystemName, Logger logger) {logger.debug("Using akka configuration\n {}", akkaConfig);ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));return actorSystem;
}// AkkaUtils
public static ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) {// Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());return RobustActorSystem.create(actorSystemName, akkaConfig);
}public static RobustActorSystem create(String name, Config applicationConfig) {return create(name, applicationConfig, FatalExitExceptionHandler.INSTANCE);
}@VisibleForTesting
static RobustActorSystem create(String name,Config applicationConfig,Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {return create(name,ActorSystemSetup.create(BootstrapSetup.create(Optional.empty(),Optional.of(applicationConfig),Optional.empty())),Option.apply(uncaughtExceptionHandler));
}private static RobustActorSystem create(String name,ActorSystemSetup setup,Option<Thread.UncaughtExceptionHandler> uncaughtExceptionHandler) {final Optional<BootstrapSetup> bootstrapSettings = setup.get(BootstrapSetup.class);final ClassLoader classLoader = RobustActorSystem.class.getClassLoader();final Config appConfig =bootstrapSettings.map(BootstrapSetup::config).flatMap(RobustActorSystem::toJavaOptional).orElseGet(() -> ConfigFactory.load(classLoader));final Option<ExecutionContext> defaultEC =toScalaOption(bootstrapSettings.map(BootstrapSetup::defaultExecutionContext).flatMap(RobustActorSystem::toJavaOptional));final RobustActorSystem robustActorSystem =new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {@Overridepublic Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {return uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);}};robustActorSystem.start();return robustActorSystem;
}private static RobustActorSystem create(String name,ActorSystemSetup setup,Option<Thread.UncaughtExceptionHandler> uncaughtExceptionHandler) {final Optional<BootstrapSetup> bootstrapSettings = setup.get(BootstrapSetup.class);final ClassLoader classLoader = RobustActorSystem.class.getClassLoader();final Config appConfig =bootstrapSettings.map(BootstrapSetup::config).flatMap(RobustActorSystem::toJavaOptional).orElseGet(() -> ConfigFactory.load(classLoader));final Option<ExecutionContext> defaultEC =toScalaOption(bootstrapSettings.map(BootstrapSetup::defaultExecutionContext).flatMap(RobustActorSystem::toJavaOptional));final RobustActorSystem robustActorSystem =new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {@Overridepublic Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {return uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);}};robustActorSystem.start();return robustActorSystem;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/458600.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单反相机内存卡误删照片怎么办?别急,这里有恢复方法

在摄影的世界里&#xff0c;单反相机无疑是众多摄影爱好者与专业摄影师的首选工具。它不仅能够捕捉细腻丰富的画面细节&#xff0c;还提供了高度的操作灵活性和可扩展性。然而&#xff0c;在使用单反相机的过程中&#xff0c;我们难免会遇到一些技术上的困扰&#xff0c;其中之…

【网络面试篇】三次握⼿、四次挥手综述

目录 一、三次握手 1. 过程描述 2. 为什么不是四次握手&#xff1f;为什么不能两次握手&#xff1f; 二、四次挥手 1. 过程描述 2. 为什么是四次挥手&#xff1f; 一、三次握手 1. 过程描述 ① 客户端 向 服务器 发送 SYN 报文、初始化序列号 ISN&#xff08;seqx&…

自定义鼠标事件在拖拽中的使用

目标&#xff1a; 显示鼠标在容器元素中划过时经过的元素,但是容器内肯能会出现大量元素&#xff0c;所以直接给容器元素添加click事件&#xff0c;通过elementFromPoint的API模拟子元素被点击事件效果 看看效果吧 涉及的重要对象 MousEvent 参考 MDN 相关代码 operateCont…

[项目详解][boost搜索引擎#2] 建立index | 安装分词工具cppjieba | 实现倒排索引

目录 编写建立索引的模块 Index 1. 设计节点 2.基本结构 3.(难点) 构建索引 1. 构建正排索引&#xff08;BuildForwardIndex&#xff09; 2.❗构建倒排索引 3.1 cppjieba分词工具的安装和使用 3.2 引入cppjieba到项目中 倒排索引代码 本篇文章&#xff0c;我们将继续项…

C++《vector的模拟实现》

在之前《vector》章节当中我们学习了STL当中的vector基本的使用方法&#xff0c;了解了vector当中各个函数该如何使用&#xff0c;在学习当中我们发现了vector许多函数的使用是和我们之前学习过的string类的&#xff0c;但同时也发现vector当中一些函数以及接口是和string不同的…

在Postgresql中对空间数据进行表分区的实践

在数据库管理中&#xff0c;合理地对数据进行分区可以提高查询性能和数据管理效率。 本文将详细介绍在Postgresql中对空间数据进行表分区的实践过程。 测试计算机容量有限&#xff0c;测试最大数据量为1,000,000条。 关键字: Postgresql PostGIS 表分区 空间数据 测试计算…

Easy Excel合并单元格情况简单导入导出

需求 实现报表数据的导入导出&#xff0c;表格中部分数据是系统生成&#xff0c;部分数据是甲方填写&#xff0c;录入系统。 批号唯一 Maven <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.…

【modbus协议】libmodbus库移植基于linux平台

文章目录 下载库函数源码编译路径添加libmodbus 源码分析核心数据结构常用接口函数 开发 TCP Server 端开发TCP Client 端 下载库函数源码 编译路径添加 libmodbus 源码分析 核心数据结构 modbus_t结构体&#xff1a; 这是 libmodbus 的核心数据结构&#xff0c;代表一个 Mod…

机房巡检机器人有哪些功能和作用

随着数据量的爆炸式增长和业务的不断拓展&#xff0c;数据中心面临诸多挑战。一方面&#xff0c;设备数量庞大且复杂&#xff0c;数据中心内服务器、存储设备、网络设备等遍布&#xff0c;这些设备需时刻保持良好运行状态&#xff0c;因为任何一个环节出现问题都可能带来严重后…

从0到1学习node.js(express模块)

文章目录 Express框架1、初体验express2、什么是路由3、路由的使用3、获取请求参数4、电商项目商品详情场景配置路由占位符规则5、小练习&#xff0c;根据id参数返回对应歌手信息6、express和原生http模块设置响应体的一些方法7、其他响应设置8、express中间件8.1、什么是中间件…

如何搭建直播美颜SDK平台的最佳实践?美颜API的实现与集成详解

本篇文章&#xff0c;将从技术实现、平台搭建、API集成以及性能优化四个方面&#xff0c;为开发者详解如何搭建一个直播美颜SDK平台。 一、直播美颜SDK平台的技术架构 一般的美颜效果包括磨皮、亮肤、瘦脸、大眼等&#xff0c;这些效果的实现需要依赖图像增强和滤镜算法。核心…

【51单片机】第一个小程序 —— 点亮LED灯

学习使用的开发板&#xff1a;STC89C52RC/LE52RC 编程软件&#xff1a;Keil5 烧录软件&#xff1a;stc-isp 开发板实图&#xff1a; 文章目录 单片机介绍LED灯介绍练习创建第一个项目点亮LED灯LED周期闪烁 单片机介绍 单片机&#xff0c;英文Micro Controller Unit&#xff0…

创建ODBC数据源SQLConfigDataSource函数的用法

网络上没有这个函数能实际落地的用法说明&#xff0c;我实践后整理一下&#xff1a; 1.头文件与额外依赖库&#xff1a; #include <odbcinst.h> #pragma comment(lib, "legacy_stdio_definitions.lib") 2.调用函数&#xff1a; if (!SQLConfigDataSourceW(…

阿里云镜像源无法访问?使用 DaoCloud 镜像源加速 Docker 下载(Linux 和 Windows 配置指南)

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f343; vue-uniapp-template &#x1f33a; 仓库主页&#xff1a; GitCode&#x1f4ab; Gitee &#x1f…

java :String 类

在我们之前的讲解中我们已经了解了很多的Java知识&#xff0c;这节我们讲Java中字符如何定义以及关于String如何使用还有常见的string函数。 【本节目标】 1. 认识 String 类 2. 了解 String 类的基本用法 3. 熟练掌握 String 类的常见操作 4. 认识字符串常量池 5. 认识 …

江协科技STM32学习- P21 ADC模数转换器

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

基于SpringCloud的WMS管理系统源码

商品管理&#xff1a;商品类型&#xff0c;规格&#xff0c;详情等设置。 采购管理&#xff1a;采购单录入。 销售管理&#xff1a;销售单录入。 库存管理&#xff1a;库存查询、库存日志 采用前后端分离的模式&#xff0c;微服务版本前端 后端采用Spring Boot、Spring Cl…

python实现放烟花效果庆祝元旦

马上就要2025年元旦啦&#xff0c;提前祝大家新年快乐 完整代码下载地址&#xff1a;https://download.csdn.net/download/ture_mydream/89926458

vLLM推理部署Qwen2.5

vLLM vLLM 是一个用于大模型推理的高效框架。它旨在提供高性能、低延迟的推理服务&#xff0c;并支持多种硬件加速器&#xff0c;如 GPU 和 CPU。 vLLM 适用于大批量Prompt输入&#xff0c;并对推理速度要求高的场景&#xff0c;吞吐量比HuggingFace Transformers高10多倍。 …

手指关节分割系统:视觉算法突破

手指关节分割系统源码&#xff06;数据集分享 [yolov8-seg-C2f-RFAConv&#xff06;yolov8-seg-fasternet-bifpn等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI Glob…