大数据学习之Flink算子、了解DataStream API(基础篇一)

DataStream API (基础篇)


注: 本文只涉及DataStream

  • 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSet成为了主流的数据处理方式。

目录

DataStream API (基础篇)

前摘:

一、执行环境

1. 创建执行环境

2. 执行模式

3. 触发程序执行

二、源算子(source)

三、转换算子(Transformation)

四、输出算子(sink)


前摘:

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 部分构成,如图所示:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

其中,获取环境和触发执行,都可以认为是针对执行环境的操作。所以本章我们就从执行 环境、数据源(source)、转换操作(Transformation)、输出(Sink)四大部分,对常用的 DataStream API 做基本介绍。

一、执行环境

1. 创建执行环境

  • 编写Flink程序的第一步就是创建执行环境。
  • 我 们 要 获 取 的 执 行 环 境 , 是 StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础
  • 在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种。
  1. getExecutionEnvironment
    最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果;
    //此处的 env 是 StreamExecutionEnvironment 对象
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. createLocalEnvironment
    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。
    //此处的 localEnvironment 是 StreamExecutionEnvironment 对象
    val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    
  3. createRemoteEnvironment
    这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。

    //此处的 remoteEnv 是 StreamExecutionEnvironment 对象
    val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
    )
    

2. 执行模式

而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。

  • 流执行模式(STREAMING) 这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 况下,程序使用的就是 STREAMING 执行模式。
  • 批执行模式(BATCH) 专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。
  • 自动模式(AUTOMATIC) 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式

由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。 主要有两种方式:

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

(2)通过代码配置

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)

3. 触发程序执行

我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等 待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute()

二、源算子(source)

Source源算子(基础篇二)

三、转换算子(Transformation)

Transformation转换算子(基础篇三)

四、输出算子(sink)

持续更新中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/244411.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZXing开源库生成二维码

引言 二维码(QR Code)作为一种快速、高容量、高密度的矩阵条码,已经在各行各业得到广泛应用。ZXing(Zebra Crossing)是一款由Google开源的Java二维码生成和解析库,提供了丰富的功能和易于使用的API。本篇博…

C# Cad2016二次开发选择csv导入信息(七)

//选择csv导入信息 [CommandMethod("setdata")] //本程序在AutoCAD的快捷命令是"DLLLOAD" public void setdata() {Microsoft.Win32.OpenFileDialog dlg new Microsoft.Win32.OpenFileDialog();dlg.DefaultExt ".csv";// Display OpenFileDial…

什么样的宣传才能对消费者起效?

品牌离不开宣传,宣传又直接面向消费者,然后面对铺天盖地的宣传,除了从业人员,相信大部分用户都会有抵触心理,今天媒介盒子就来和大家聊聊,什么样的宣传能够提高消费者的接受度,让宣传不白宣传。…

RabbitMQ中交换机的应用 ,原理 ,案例的实现

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是平顶山大师,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《RabbitMQ中交换机的应用及原理,案…

TCP服务器最多支持多少客户端连接

目录 一、理论数值 二、实际部署 参考 一、理论数值 首先知道一个基础概念,对于一个 TCP 连接可以使用四元组(src_ip, src_port, dst_ip, dst_port)进行唯一标识。因为服务端 IP 和 Port 是固定的(如下图中的bind阶段&#xff0…

Pytest中conftest.py的用法

Pytest中conftest.py的用法 ​ 在官方文档中,描述conftest.py是一个本地插件的文件,简单的说就是在这个文件中编写的方法,可以在其他地方直接进行调用。 注意事项 只能在根目录编写conftest.py 插件加载顺序在搜集用例之前 基础用法 这里…

NebulaGraph is nothing without you | 社区 2023 年度人物合集

在去年的年度人物 回顾中,我们看到了形形色色的人们,他们当中有帮 NebulaGraph 捉 bug 的小能手,也有通过用回复来解答他人疑惑的启蒙者…在今年(2023 年),我们这个整点不一样的,将镜头推进&…

新版idea创建spring boot项目

目录 前言 汉化教程 项目模板初始化 1.点击新建项目 2.配置初始化信息 3.初始依赖选择 配置Maven 1.打开maven设置 2.重写maven配置文件 3.选择你创建的配置文件 4.重启项目 spring boot配置并测试 1.修改配置文件后缀 2.启动项目 3.编写测试控制类 4.重启项目…

【Go面试向】defer与time.sleep初探

【Go面试向】defer与time.sleep初探 大家好 我是寸铁👊 总结了一篇defer传参与time.sleep初探的文章✨ 喜欢的小伙伴可以点点关注 💝 请大家看下面这段代码,看运行结果会出现什么,为什么? 问题 demo package mainim…

性能优化-HVX架构简介

来自 「发表于知乎专栏《移动端算法优化》」 本文主要介绍Hexagon DSP的HVX技术,旨在通过简单的语言讲清HVX技术。 🎬个人简介:一个全栈工程师的升级之路! 📋个人专栏:高性能(HPC)开…

数据管理平台Splunk Enterprise本地部署并结合内网穿透实现远程访问

文章目录 前言1. 搭建Splunk Enterprise2. windows 安装 cpolar3. 创建Splunk Enterprise公网访问地址4. 远程访问Splunk Enterprise服务5. 固定远程地址 前言 本文主要介绍如何简单几步,结合cpolar内网穿透工具实现随时随地在任意浏览器,远程访问在本地…

【书生·浦语大模型实战】“PDF阅读小助手”学习笔记

1 模型部署 在InternStudio平台中选择A100 (1/4)的配置,镜像选择Cuda11.7-conda,可以选择已有的开发机langchain; 1.1 创建工作空间 mkdir /root/pdf_project1.2 Clone项目 git clone https://gitee.com/tcexeexe/pdf-reading-assistant.…

HCIA——23DNS层次域名空间、域名服务器、域名解析的原理的选择、解答

学习目标: 计算机网络 1.掌握计算机网络的基本概念、基本原理和基本方法。 2.掌握计算机网络的体系结构和典型网络协议,了解典型网络设备的组成和特点,理解典型网络设备的工作原理。 3.能够运用计算机网络的基本概念、基本原理和基本方法进行…

项目解决方案: 视频融合(实时监控视频和三维建模进行融合)设计方案

目 录 一、需求描述 1、视频接入和控制要求 2、视频播放需求 3、提供其他应用的调用 二、方案设计 (一)系统设计图 (二)产品实现方案 三、产品和功能描述 (一)总体描述 &#xf…

QuestDB时序数据库快速入门

简介 QuestDB是一个开源的高性能时序数据库,专门用于处理时间序列相关的数据存储与查询; QuestDB使用列式存储模型。数据存储在表中,每列存储在其自己的文件和其自己的本机格式中。新数据被附加到每列的底部,以便能够按照与摄取…

uniapp中打包Andiord app,在真机调试时地图以及定位功能可以正常使用,打包成app后失效问题(高德地图)

踩坑uniapp中打包Andiord app,在真机调试时地图以及定位功能可以正常使用,打包成app后失效问题_uniapp真机调试高德地图正常 打包apk高德地图就不加载-CSDN博客 问题: 目前两个项目,一个项目是从另一个项目里面分割出来的一整套…

10个常考的前端手写题,你全都会吗?(下)

前言 📫 大家好,我是南木元元,热爱技术和分享,欢迎大家交流,一起学习进步! 🍅 个人主页:南木元元 今天接着上篇再来分享一下10个常见的JavaScript手写功能。 目录 1.实现继承 ES5继…

【快刊录用】15天录用!含中科院1区TOP-4区,投必中!

2024年1月13日-20241月19日 进展喜讯 经核实,由我处Unionpub学术推荐的论文中,新增2篇论文录用、2篇上线见刊、5篇数据库检索: 录用通知 FA20977 FA20479 — 见刊通知 FA20245 FA20885 — 检索通知 FA20924 FA20799 FA20790 FA2…

【IEEE会议征稿】2024年第九届智能计算与信号处理国际学术会议(ICSP 2024)

2024年第九届智能计算与信号处理国际学术会议(ICSP 2024) 2024年第八届智能计算与信号处理国际学术会议(ICSP 2024)将在西安举行, 会期是2024年4月19-21日, 为期三天, 会议由西安科技大学主办。 欢迎参会&…

4G物联网LED智慧路灯杆显示屏产品介绍

4GLED显示屏是一种具有4G网络连接功能的LED显示屏。它可以通过4G网络连接到互联网,实现远程管理和控制,方便进行内容更新和管理。同时,4GLED显示屏具有高亮度、高清晰度和高对比度的特点,可以提供清晰明亮的图像和视频展示效果。它…