Flink_DataStreamAPI_执行环境

DataStreamAPI_执行环境

  • 1创建执行环境
    • 1.1getExecutionEnvironment
    • 1.2createLocalEnvironment
    • 1.3createRemoteEnvironment
  • 2执行模式(Execution Mode)
  • 3触发程序执行

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

在这里插入图片描述

1创建执行环境

我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

1.1getExecutionEnvironment

最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.2createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

1.3createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host",                   // JobManager主机名1234,                     // JobManager进程端口号"path/to/jarFile.jar"  // 提交给JobManager的JAR包);

2执行模式(Execution Mode)

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式(Batch)
专门用于批处理的执行模式。

自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
批执行模式的使用。主要有两种方式:
(1)通过命令行配置
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

(2)通过代码配置
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

3触发程序执行

需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473301.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙中如何实现图片拉伸效果

2024年10月22日,华为发布会上,推出鸿蒙5.0。现在加入恰逢时机,你,我皆是鸿蒙时代合伙人。无论为了学习技术,还是为了谋福利,在鸿蒙的浩瀚海洋中分到一杯羹。现在学习鸿蒙正当时。 一文了解鸿蒙中图片拉伸的…

Unity 2022 Nav Mesh 自动寻路入门

untiy 2022 window-PackageManager-AINavigation 安装 Install 2.创建一个空物体命名Nav,在其自身挂载 NavMeshSurface 然后点击bake 烘焙地形即可 3.创建palyer和怪物 怪物AI代码 using System.Collections; using System.Collections.Generic; using UnityEngi…

基于gradio+networkx库对图结构进行可视化展示

前言 在gradio框架下对蛋白质-蛋白质相互作用网络(PPI网络)进行可视化,并将其在网页前端进行展示。 方法 其实很简单 可以直接使用networkx画图后保存图片,然后使用Gradio框架的image组件进行展示即可。 但实际上gradio还配置…

MSTP知识点

多生成树协议 在 MSTP(Multiple Spanning Tree Protocol)中,根桥(root)、指定端口(designated port)、备用端口(alternate port)等角色都是确保网络中没有循环并且流量能…

为正在运行的 Docker 容器重启策略,以提高服务的可用性

为正在运行的 Docker 容器重启策略,以提高服务的可用性。 为正在运行的 Docker 容器添加 --restartalways –restartalways 是 Docker 中一个常用的参数,用来设置容器的重启策略。它的作用是确保容器在一定条件下能够自动重启,以提高服务的可用性。 方…

后台管理系统(开箱即用)

很久没有更新博客了,给大家带上一波福利吧,大佬勿扰 现在市面上流行的后台管理模板很多,若依,芋道等,可是这些框架对我们来说可能会有点重,所以我自己从0到1写了一个后台管理模板,你们使用时候可扩展性也会更高 项目主要功能: 成员管理,部门管理&#…

Cursor安装Windows / Ubuntu

一、安装 1、下载软件 2、安装依赖 #安装fuse sudo apt-get install fuse3、将cursor添加到应用程序列表 sudo mv cursor-0.42.5x86_64.AppImage /opt/cursor.appimage #使用自己版本号替换 sudo chmod x /opt/cursor.appimage #给予可执行权限 sudo nano /usr/share/applic…

谷粒商城のRedisESRabbit MQ集群

文章目录 前言一、搭建Redis集群三、搭建ES集群三、搭建Rabbit MQ集群 前言 本篇是谷粒商城集群部署篇,搭建Redis、ES、Rabbit MQ集群实践的个人笔记,也是谷粒商城笔记的最后一篇。集群相关的理论性内容,会放在面试篇的笔记中。 一、搭建Redi…

孙赢利_11月17日_超分周报

一. 康佳PC端实现:1080 → 4K 实时超分 1. 将图像预处理操作从 CPU → GPU 运行 2. 后处理部分操作 从 CPU → GPU 运行 inference_realesrgan_Animal_Video.py import argparse import cv2 import glob import os from basicsr.archs.rrdbnet_arch import RRDBNe…

录的视频怎么消除杂音?从录制到后期的杂音消除攻略

在录制视频时,杂音往往是一个令人头疼的问题。无论是环境噪音、设备噪音还是电磁干扰,杂音的存在都会极大地影响视频的听觉体验。录的视频怎么消除杂音?通过一些前期准备和后期处理技巧,我们可以有效地消除这些杂音,提…

论文《基于现实迷宫地形的电脑鼠设计》深度分析——智能车驱动算法

论文概述 《基于现实迷宫地形的电脑鼠设计》是由吴润强、庹忠曜、刘文杰、项璟晨、孙科学等人于2023年发表的一篇优秀期刊论文。其针对现阶段电脑鼠计算量庞大且不适用于现实迷宫地形的问题,特基于超声波测距与传统迷宫算法原理,设计出一款可在现实迷宫地…

算法日记 26-27day 贪心算法

接下来的题目有些地方比较相似。需要注意多个条件。 题目:分发糖果 135. 分发糖果 - 力扣(LeetCode) n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求,给这些孩子分发糖果: 每…

vue3点击按钮el-dialog对话框不显示问题

vue3弹框不显示问题,控制台也没报错 把 append-to-body:visible.sync"previewDialogOpen" 改为 append-to-bodyv-model"previewDialogOpen" 就好了。

wordpress使用相关

这里写目录标题 遇到的相关问题WordPress安装插件过程中遇到需要ftp出现确实XMLReader 插件的提示cURL Support Missing(curl 缺失) 遇到的相关问题 WordPress安装插件过程中遇到需要ftp 一般在这个位置 出现确实XMLReader 插件的提示 解决&#xff1a…

21.3D surface

3D surface """ File : 05-decoding-Major Name : 3d_surface.py Author : lyq Date : 2024/11/16 23:10 Envi : PyCharm Description: files details """ import numpy as np import matplotlib.pyplot as plt# 设置全局默认字体…

ARM(安谋) China处理器

0 Preface/Foreword 0.1 参考博客 Cortex-M23/M33与STAR-MC1星辰处理器 ARM China,2018年4月established,独立运行。 1 处理器类型 1.1 周易AIPU 1.2 STAR-MC1(星辰处理器) STAT-MC1,主要为满足AIOT应用性能、功…

windows C#-异步编程概述(二)

不要阻塞,而要等待 上述代码演示了一种不好的做法:构建同步代码来执行异步操作。正如所写,此代码会阻止执行它的线程执行任何其他工作。在任何任务正在进行时,它都不会被中断。这就像你把面包放进去后盯着烤面包机一样。你会忽略…

【Android原生问题分析】夸克、抖音划动无响应问题【Android14】

1 问题描述 偶现问题,用户打开夸克、抖音后,在界面上划动无响应,但是没有ANR。回到Launcher后再次打开夸克/抖音,发现App的界面发生了变化,但是仍然是划不动的。 2 log初分析 复现问题附近的log为: 用户…

【STM32】MPU6050简介

文章目录 MPU6050简介MPU6050关键块带有16位ADC和信号调理的三轴MEMS陀螺仪具有16位ADC和信号调理的三轴MEMS加速度计I2C串行通信接口 MPU6050对应的数据手册:MPU6050 陀螺仪加速度计 链接: https://pan.baidu.com/s/13nwEhGvsfxx0euR2hMHsyw?pwdv2i6 提取码: v2i6…

【软件测试】设计测试用例的万能公式

文章目录 概念设计测试用例的万能公式常规思考逆向思维发散性思维万能公式水杯测试弱网测试如何进行弱网测试 安装卸载测试 概念 什么是测试用例? 测试⽤例(Test Case)是为了实施测试⽽向被测试的系统提供的⼀组集合,这组集合包…