Flink启动任务

Flink

以本地运行作为解读,版本1.16.0


文章目录

  • Flink
  • 前言
  • StreamExecutionEnvironment
  • LocalExecutor
  • MiniCluster
    • 启动MiniCluster
  • TaskManager
  • TaskExecutor
    • 提交Task(submitTask)
  • StreamGraph
  • 二、使用步骤
    • 1.引入库
    • 2.读入数据
  • 总结


前言

提示:这里可以添加本文要记录的大概内容:

例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。


提示:以下是本篇文章正文内容,下面案例可供参考

StreamExecutionEnvironment

流执行环境:本地使用:LocalStreamEnvironment,远程使用:RemoteStreamEnvironment。

1.1 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.1.1 获取所有算子,只能包含一个sink(输出)类型的算子。
1.1.2 调用方法getStreamGraph()将算子转换为流图(StreamGraph)。
1.1.3 调用execute(StreamGraph streamGraph)。

public JobExecutionResult execute(String jobName) throws Exception {final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);}
}

1.2 通过StreamExecutionEnvironment调用execute(String jobName)方法。
1.2.1 通过executeAsync(StreamGraph streamGraph),方法异步执行流图。
1.2.2 根据返回的JobClient,用户控制作业的执行。

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);return null;}
}

1.3 通过StreamExecutionEnvironment调用executeAsync(StreamGraph streamGraph)方法。
1.3.1 通过getPipelineExecutor()方法获取PipelineExecutor为LocalExecutor。
1.3.2 LocalExecutor根据提供的工作流图,并执行。获取JobClient,允许与正在执行的作业进行交互。例如:取消作业或获取保存点。
1.3.3 JobListener监听JobClient。

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {checkNotNull(streamGraph, "StreamGraph cannot be null.");final PipelineExecutor executor = getPipelineExecutor();CompletableFuture<JobClient> jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));collectIterators.clear();return jobClient;} catch (ExecutionException executionException) {final Throwable strippedException =ExceptionUtils.stripExecutionException(executionException);jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),strippedException);}
}

LocalExecutor


实现了PipelineExecutor,负责执行StreamGraph,即用户提交的作业。

  1. 将StreamGraph通过getJobGraph(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)生成为JobGraph。
  2. 通过submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader)方法创建一个MiniCluster并提交一个任务(Job)。
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {checkNotNull(pipeline);checkNotNull(configuration);Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

MiniCluster


本地执行Job任务

启动MiniCluster


1.1 创建workingDirectory,WorkingDirectory:该类管理进程或实例的工作目录,当已经被实例化后,该类确保指定的工作目录已存在。
1.2 创建metricRegistry,MetricRegistry:该类跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接。
1.3 创建commonRpcService,RpcService:rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程。
1.4 taskManagerRpcServiceFactory = commonRpcService
1.5 创建metricQueryServiceRpcService,RpcService
1.6 将metricQueryServiceRpcService设置到metricRegistry的用于初始化MetricQueryService。
1.7 创建processMetricGroup,ProcessMetricGroup,并设置metricRegistry。
1.8 创建ioExecutor,ExecutorService,线程池用于任务执行。
1.9 创建haServicesFactory,HighAvailabilityServicesFactory,创建高可用服务工厂。
1.20 创建haServicesFactory,通过高可用工厂创建高可用服务,并将当前注入到ioExecutor。HighAvailabilityServices:高可用服务可以访问所有支持高可用的组件(服务),这些服务提供了高可用的存储和服务注册,以及分布式计算和领导者选举。

ResourceManager  领导者选举并获取领导者信息
JobManager 领导者选举并获取领导者信息
Persistence 用户检查点元数据
Registering 最新完成的检查点
Persistence BLOB(二进制大对象)存储
Registry  标记作业状态
Naming RPC站点

1.21 创建blobServer并启动,BlobServer:BLOB服务器负责监听传入的请求,并生成线程来处理这些请求。此外,它还负责创建目录结构来存储BLOB或临时缓存它们。
1.22 创建heartbeatServices,HeartbeatServices:提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送器。
1.23 创建delegationTokenManager,传入了commonRpcService.getScheduledExecutor()和ioExecutor。
DelegationTokenManager:Flink集群代理所有的Token管理器。代理Token启动后,此管理器将确保长时间运行的应用程序在访问安全服务时可以不中断地运行。它必须联系所有配置的安全服务,以获取要分发给应用程序其余部分的委托令牌。
1.24 创建blobCacheService传入workingDirectory.getBlobStorageDirectory(),haServices.createBlobStore(),InetSocketAddress的地址。
BlobCacheService:存储为永久或临时BLOB,并提供对BLOB服务的访问
1.25 startTaskManagers()启动TaskManager服务
1.26 resourceManagerLeaderRetriever,从高可用服务中获取资源管理领导者(选举和获取领导者),在后续启动
1.27 dispatcherLeaderRetriever,从高可用服务中获取调度领导者(选举和获取领导者),在后续启动
1.28 clusterRestEndpointLeaderRetrievalService,从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者),在后续启动

public void start() throws Exception {synchronized (lock) {checkState(!running, "MiniCluster is already running");LOG.info("Starting Flink Mini Cluster");LOG.debug("Using configuration {}", miniClusterConfiguration);final Configuration configuration = miniClusterConfiguration.getConfiguration();final boolean useSingleRpcService =miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;try {//管理进程或实例的工作目录workingDirectory =WorkingDirectory.create(ClusterEntrypointUtils.generateWorkingDirectoryFile(configuration,Optional.of(PROCESS_WORKING_DIR_BASE),"minicluster_" + ResourceID.generate()));initializeIOFormatClasses(configuration);rpcSystem = rpcSystemSupplier.get();LOG.info("Starting Metrics Registry");//跟踪所有已注册的指标,是MetricGroups和MetricReporters之间的连接metricRegistry =createMetricRegistry(configuration,rpcSystem.deref().getMaximumMessageSizeInBytes(configuration));// bring up all the RPC servicesLOG.info("Starting RPC Service(s)");final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;final RpcService metricQueryServiceRpcService;if (useSingleRpcService) {// we always need the 'commonRpcService' for auxiliary calls//rpc服务用于启动并连接到RpcEndpoint。连接到rpc服务器将返回一个RpcGateway,可用于调用远程过程commonRpcService = createLocalRpcService(configuration, rpcSystem.deref());final CommonRpcServiceFactory commonRpcServiceFactory =new CommonRpcServiceFactory(commonRpcService);taskManagerRpcServiceFactory = commonRpcServiceFactory;dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;metricQueryServiceRpcService =MetricUtils.startLocalMetricsRpcService(configuration, rpcSystem.deref());} else {// start a new service per component, possibly with custom bind addressesfinal String jobManagerExternalAddress =miniClusterConfiguration.getJobManagerExternalAddress();final String taskManagerExternalAddress =miniClusterConfiguration.getTaskManagerExternalAddress();final String jobManagerExternalPortRange =miniClusterConfiguration.getJobManagerExternalPortRange();final String taskManagerExternalPortRange =miniClusterConfiguration.getTaskManagerExternalPortRange();final String jobManagerBindAddress =miniClusterConfiguration.getJobManagerBindAddress();final String taskManagerBindAddress =miniClusterConfiguration.getTaskManagerBindAddress();dispatcherResourceManagerComponentRpcServiceFactory =new DedicatedRpcServiceFactory(configuration,jobManagerExternalAddress,jobManagerExternalPortRange,jobManagerBindAddress,rpcSystem.deref());taskManagerRpcServiceFactory =new DedicatedRpcServiceFactory(configuration,taskManagerExternalAddress,taskManagerExternalPortRange,taskManagerBindAddress,rpcSystem.deref());// we always need the 'commonRpcService' for auxiliary calls// bind to the JobManager address with port 0commonRpcService =createRemoteRpcService(configuration, jobManagerBindAddress, 0, rpcSystem.deref());metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration,commonRpcService.getAddress(),null,rpcSystem.deref());}metricRegistry.startQueryService(metricQueryServiceRpcService, null);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,RpcUtils.getHostname(commonRpcService),ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));//创建线程池,执行对应的任务ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("mini-cluster-io"));//高可用服务工厂haServicesFactory = createHighAvailabilityServicesFactory(configuration);//高可用服务haServices = createHighAvailabilityServices(configuration, ioExecutor);//BLOB(二进制大对象)存储服务创建并启用blobServer =BlobUtils.createBlobServer(configuration,Reference.borrowed(workingDirectory.getBlobStorageDirectory()),haServices.createBlobStore());blobServer.start();//监控所有服务心跳检测heartbeatServices = HeartbeatServices.fromConfiguration(configuration);//代理Tokne管理器delegationTokenManager =KerberosDelegationTokenManagerFactory.create(getClass().getClassLoader(),configuration,commonRpcService.getScheduledExecutor(),ioExecutor);//BLOB缓存服务:提供永久和临时的存储,并提供对BLOB服务的访问blobCacheService =BlobUtils.createBlobCacheService(configuration,Reference.borrowed(workingDirectory.getBlobStorageDirectory()),haServices.createBlobStore(),new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()));startTaskManagers();MetricQueryServiceRetriever metricQueryServiceRetriever =new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());setupDispatcherResourceManagerComponents(configuration,dispatcherResourceManagerComponentRpcServiceFactory,metricQueryServiceRetriever);//从高可用服务中获取资源管理领导者(选举和获取领导者)resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();//从高可用服务中获取调度领导者(选举和获取领导者)dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();//从高可用服务中获取集群空闲站点领导者服务(选举和获取领导者)clusterRestEndpointLeaderRetrievalService =haServices.getClusterRestEndpointLeaderRetriever();//创建调度网关回收器dispatcherGatewayRetriever =new RpcGatewayRetriever<>(commonRpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));//创建资源管理网关回收器resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(commonRpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));webMonitorLeaderRetriever = new LeaderRetriever();//资源管理领导者启用resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);//调度领导者启用          dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);//集群空闲站点领导者启用      clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);} catch (Exception e) {// cleanup everythingtry {close();} catch (Exception ee) {e.addSuppressed(ee);}throw e;}// create a new termination futureterminationFuture = new CompletableFuture<>();// now officially mark this as runningrunning = true;LOG.info("Flink Mini Cluster started successfully");}
}

TaskManager


MiniCluster中的start(),调用下列方法

  1. 根据配置的TaskManager的个数启动,默认是1。
  2. 创建TaskExecutor注入RpcService(任务管理Rcp服务),haServices(高可用服务), heartbeatServices(心跳检测服务),metricRegistry(metric注册),blobCacheService(BLOB缓存服务),workingDirectory(工作目录),fatalErrorHandler(失败异常)。
  3. 启动TaskExecutor,并添加到taskManagers数组中。
 private void startTaskManagers() throws Exception {final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();for (int i = 0; i < numTaskManagers; i++) {startTaskManager();}
}public void startTaskManager() throws Exception {synchronized (lock) {final Configuration configuration = miniClusterConfiguration.getConfiguration();final TaskExecutor taskExecutor =TaskManagerRunner.startTaskManager(configuration,new ResourceID(UUID.randomUUID().toString()),taskManagerRpcServiceFactory.createRpcService(),haServices,heartbeatServices,metricRegistry,blobCacheService,useLocalCommunication(),ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,workingDirectory.createSubWorkingDirectory("tm_" + taskManagers.size()),taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));taskExecutor.start();taskManagers.add(taskExecutor);}
}

TaskExecutor


任务执行器,负责多个Task任务执行。

提交Task(submitTask)

@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {try {final JobID jobId = tdd.getJobId();final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();final JobTable.Connection jobManagerConnection =jobTable.getConnection(jobId).orElseThrow(() -> {final String message ="Could not submit task because there is no JobManager "+ "associated for the job "+ jobId+ '.';log.debug(message);return new TaskSubmissionException(message);});if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {final String message ="Rejecting the task submission because the job manager leader id "+ jobMasterId+ " does not match the expected job manager leader id "+ jobManagerConnection.getJobMasterId()+ '.';log.debug(message);throw new TaskSubmissionException(message);}if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {final String message ="No task slot allocated for job ID "+ jobId+ " and allocation ID "+ tdd.getAllocationId()+ '.';log.debug(message);throw new TaskSubmissionException(message);}// re-integrate offloaded data:try {tdd.loadBigData(taskExecutorBlobService.getPermanentBlobService());} catch (IOException | ClassNotFoundException e) {throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);}// deserialize the pre-serialized informationfinal JobInformation jobInformation;final TaskInformation taskInformation;try {jobInformation =tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());taskInformation =tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());} catch (IOException | ClassNotFoundException e) {throw new TaskSubmissionException("Could not deserialize the job or task information.", e);}if (!jobId.equals(jobInformation.getJobId())) {throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor ("+ tdd.getJobId()+ " vs. "+ jobInformation.getJobId()+ ")");}TaskManagerJobMetricGroup jobGroup =taskManagerMetricGroup.addJob(jobInformation.getJobId(), jobInformation.getJobName());// note that a pre-existing job group can NOT be closed concurrently - this is done by// the same TM thread in removeJobMetricsGroupTaskMetricGroup taskMetricGroup =jobGroup.addTask(tdd.getExecutionAttemptId(), taskInformation.getTaskName());InputSplitProvider inputSplitProvider =new RpcInputSplitProvider(jobManagerConnection.getJobManagerGateway(),taskInformation.getJobVertexId(),tdd.getExecutionAttemptId(),taskManagerConfiguration.getRpcTimeout());final TaskOperatorEventGateway taskOperatorEventGateway =new RpcTaskOperatorEventGateway(jobManagerConnection.getJobManagerGateway(),executionAttemptID,(t) -> runAsync(() -> failTask(executionAttemptID, t)));TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();GlobalAggregateManager aggregateManager =jobManagerConnection.getGlobalAggregateManager();LibraryCacheManager.ClassLoaderHandle classLoaderHandle =jobManagerConnection.getClassLoaderHandle();PartitionProducerStateChecker partitionStateChecker =jobManagerConnection.getPartitionStateChecker();final TaskLocalStateStore localStateStore =localStateStoresManager.localStateStoreForSubtask(jobId,tdd.getAllocationId(),taskInformation.getJobVertexId(),tdd.getSubtaskIndex(),taskManagerConfiguration.getConfiguration(),jobInformation.getJobConfiguration());// TODO: Pass config value from user program and do overriding here.final StateChangelogStorage<?> changelogStorage;try {changelogStorage =changelogStoragesManager.stateChangelogStorageForJob(jobId,taskManagerConfiguration.getConfiguration(),jobGroup,localStateStore.getLocalRecoveryConfig());} catch (IOException e) {throw new TaskSubmissionException(e);}final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();final TaskStateManager taskStateManager =new TaskStateManagerImpl(jobId,tdd.getExecutionAttemptId(),localStateStore,changelogStorage,changelogStoragesManager,taskRestore,checkpointResponder);MemoryManager memoryManager;try {memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());} catch (SlotNotFoundException e) {throw new TaskSubmissionException("Could not submit task.", e);}Task task =new Task(jobInformation,taskInformation,tdd.getExecutionAttemptId(),tdd.getAllocationId(),tdd.getProducedPartitions(),tdd.getInputGates(),memoryManager,taskExecutorServices.getIOManager(),taskExecutorServices.getShuffleEnvironment(),taskExecutorServices.getKvStateService(),taskExecutorServices.getBroadcastVariableManager(),taskExecutorServices.getTaskEventDispatcher(),externalResourceInfoProvider,taskStateManager,taskManagerActions,inputSplitProvider,checkpointResponder,taskOperatorEventGateway,aggregateManager,classLoaderHandle,fileCache,taskManagerConfiguration,taskMetricGroup,partitionStateChecker,getRpcService().getScheduledExecutor());taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);log.info("Received task {} ({}), deploy into slot with allocation id {}.",task.getTaskInfo().getTaskNameWithSubtasks(),tdd.getExecutionAttemptId(),tdd.getAllocationId());boolean taskAdded;try {taskAdded = taskSlotTable.addTask(task);} catch (SlotNotFoundException | SlotNotActiveException e) {throw new TaskSubmissionException("Could not submit task.", e);}if (taskAdded) {task.startTaskThread();setupResultPartitionBookkeeping(tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message ="TaskManager already contains a task for id " + task.getExecutionId() + '.';log.debug(message);throw new TaskSubmissionException(message);}} catch (TaskSubmissionException e) {return FutureUtils.completedExceptionally(e);}
}

StreamGraph

生成执行图

二、使用步骤

1.引入库

代码如下(示例):

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import  ssl
ssl._create_default_https_context = ssl._create_unverified_context

2.读入数据

代码如下(示例):

data = pd.read_csv('https://labfile.oss.aliyuncs.com/courses/1283/adult.data.csv')
print(data.head())

该处使用的url网络请求的数据。


总结

提示:这里对文章进行总结:

例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/38987.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Agent】Dify Docker 安装问题 INTERNAL SERVER ERROR

总结&#xff1a;建议大家选择稳定版本的分支&#xff0c;直接拉取 master 分支&#xff0c;可能出现一下后面更新代码导致缺失一些环境内容。 启动报错 一直停留在 INSTALL 界面 我是通过 Docker 进行安装的&#xff0c;由于项目开发者不严谨导致&#xff0c;遇到一个奇怪的…

unity开发效率提升笔记

本文将记录提升Unity开发效率的若干细节&#xff0c;持续更新 一.VSCode文件标签多行显示 1.File->Preference->Settings (快捷键Ctrl 逗号) 2.搜索workbench.editor.wrapTabs 3.勾选上这个单选开关 若依然不是多行 4.搜索workbench.editor.tabSizing,选择fi…

python每日十题(6)

列表操作函数有&#xff08;假设列表名为ls&#xff09;&#xff1a; len(ls)&#xff1a;返回列表ls的元素个数&#xff08;长度&#xff09;。min(ls)&#xff1a;返回列表ls的最小元素。max(ls)&#xff1a;返回列表ls的最大元素。list(x)&#xff1a;将x转变为列表类型。使…

【Java】TCP网络编程:从可靠传输到Socket实战

活动发起人小虚竹 想对你说&#xff1a; 这是一个以写作博客为目的的创作活动&#xff0c;旨在鼓励大学生博主们挖掘自己的创作潜能&#xff0c;展现自己的写作才华。如果你是一位热爱写作的、想要展现自己创作才华的小伙伴&#xff0c;那么&#xff0c;快来参加吧&#xff01…

使用HAI来打通DeepSeek的任督二脉

一、什么是HAI HAI是一款专注于AI与科学计算领域的云服务产品&#xff0c;旨在为开发者、企业及科研人员提供高效、易用的算力支持与全栈解决方案。主要使用场景为&#xff1a; AI作画&#xff0c;AI对话/写作、AI开发/测试。 二、开通HAI 选择CPU算力 16核32GB&#xff0c;这…

mysql——第二课

学生表 CREATE TABLE student (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,sex varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,age int(11) DEFAULT NULL,c_id int(10) DEFAULT NULL,PRIMARY KEY (id),KEY c_id (c_id),CONSTR…

单播、广播、组播和任播

文章目录 一、单播二、广播三、组播四、任播代码示例&#xff1a; 五、各种播的比较 一、单播 单播&#xff08;Unicast&#xff09;是一种网络通信方式&#xff0c;它指的是在网络中从一个源节点到一个单一目标节点对的传输模式。单播传输时&#xff0c;数据包从发送端直接发…

1-1 MATLAB深度极限学习机

本博客来源于CSDN机器鱼&#xff0c;未同意任何人转载。 更多内容&#xff0c;欢迎点击本专栏目录&#xff0c;查看更多内容。 参考[1]魏洁.深度极限学习机的研究与应用[D].太原理工大学[2023-10-14].DOI:CNKI:CDMD:2.1016.714596. 目录 0.引言 1.ELM-AE实现 2.DE…

头歌 数据采集概述答案

问题1&#xff1a;以下哪个不是Scrapy体系架构的组成部分&#xff1f; 正确答案&#xff1a;B. 支持者(Support) 解释&#xff1a;Scrapy的主要组成部分包括&#xff1a; 爬虫(Spiders)&#xff1a;定义如何爬取网站和提取数据 引擎(Engine)&#xff1a;负责控制数据流在系统中…

【uniapp】记录tabBar不显示踩坑记录

由于很久没有使用uniapp了&#xff0c;官方文档看着又杂乱&#xff0c;底部tab导航栏一直没显示&#xff0c;苦思许久&#xff0c;没有发现原因&#xff0c;最后网上搜到帖子&#xff0c;list里的第一个数据&#xff0c;pages 的第一个 path 必须与 tabBar 的第一个 pagePath 相…

JVM 知识点梳理

JDK 、JRE、JVM JDK&#xff08; Java Development Kit &#xff09; Java开发工具包 JRE 开发命令工具&#xff08;运行java.exe、编译javac.exe、javaw.exe&#xff09; JRE&#xff08; Java Runtime Environment &#xff09;Java运行环境 JVM Java核心类库&#xff08;l…

蓝桥杯 之 第27场月赛总结

文章目录 习题1.抓猪拿国一2.蓝桥字符3.蓝桥大使4.拳头对决5.未来竞赛6.备份比赛数据 习题 比赛地址 1.抓猪拿国一 十分简单的签到题 print(sum(list(range(17))))2.蓝桥字符 常见的字符匹配的问题&#xff0c;是一个二维dp的问题&#xff0c;转化为对应的动态规划求解 力扣…

Ambari、Bigtop源码编译最新支持情况汇总

以下是目前的版本情况 支持了绝大部分的组件编译及安装 版本组件名称组件版本env 版本v1.0.5Ozone1.4.11.0.5Impala4.4.11.0.5Nightingale7.7.21.0.5Categraf0.4.11.0.5VictoriaMetrics1.109.11.0.5Cloudbeaver24.3.31.0.5Celeborn0.5.31.0.5v1.0.4Doris2.1.71.0.4v1.0.3Phoen…

仅靠prompt,Agent难以自救

Alexander的观点很明确&#xff1a;未来 AI 智能体的发展方向还得是模型本身&#xff0c;而不是工作流&#xff08;Work Flow&#xff09;。还拿目前很火的 Manus 作为案例&#xff1a;他认为像 Manus 这样基于「预先编排好的提示词与工具路径」构成的工作流智能体&#xff0c;…

【Docker系列一】Docker 简介

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Sqoop 常用命令

Sqoop 是用于在 Hadoop 和关系型数据库&#xff08;如 MySQL、Oracle 等&#xff09;之间高效传输数据的工具。以下是常用的 Sqoop 命令及示例&#xff1a; CREATE TABLE employees (id INT AUTO_INCREMENT PRIMARY KEY, -- 自增主键&#xff0c;用于唯一标识每一行name VAR…

连续型随机变量及其分布

连续型随机变量 数学公式可以看作一门精确描述事物的语言&#xff0c;比语言尤其是汉语的模糊性精确多了&#xff01;离散型数据的处理可以通过枚举和相加进行处理。而连续型数据则没有办法这样处理。我们必须要通过函数和取值区间还有微积分计算。 &#xff3b;定义1&#x…

PostgreSQL_数据使用与日数据分享

目录 前置&#xff1a; 1 使用 1.1 获取前复权因子 1.2 查询股票的纵向数据 1.3 查询股票的横向数据 2 日数据分享&#xff08;截止至&#xff1a;2025-03-21&#xff09; 总结 前置&#xff1a; 本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文。…

Rocky9.5基于sealos快速部署k8s集群

首先需要下载 Sealos 命令行工具&#xff0c;sealos 是一个简单的 Golang 二进制文件&#xff0c;可以安装在大多数 Linux 操作系统中。 以下是一些基本的安装要求&#xff1a; 每个集群节点应该有不同的主机名。主机名不要带下划线。 所有节点的时间需要同步。 需要在 K8s …

qt实现一个简单http服务器和客户端

一、功能简介 服务器&#xff1a; 登录功能、下载文件功能 客户端&#xff1a; 登录功能、下载文件功能、上传成绩功能 二、服务器代码 //HttpServer.h #ifndef HTTPSERVER_H #define HTTPSERVER_H#include <QMainWindow> #include <QTcpSocket> #include <QTc…