【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream.map(s -> s.length()).filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-wordcount-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><flink.version>1.13.2</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter())  // 将文本行切分为单词.keyBy(0)  // 按单词分组.sum(1);  // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)while True:data = input("Enter text: ")client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/177909.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git命令清单

一、设置和配置 1.初始化一个新的仓库&#xff1a; git init2.克隆&#xff08;Clone&#xff09;一个远程仓库到本地&#xff1a; git clone <repository_url>3.配置用户信息&#xff1a; git config --global user.name "Your Name" git config --global…

疑难杂症-暂时不能解析域名“mirrors.tuna.tsinghua.edu.cn”

可能是太久没用Ubuntu了&#xff0c;总是有一些莫名其妙的问题 我的方法简单粗暴&#xff1a;不需要重启&#xff0c;打开终端&#xff0c;输入sudo apt-get update&#xff0c;解析成功 还有一些别的方法&#xff0c;不过我也没试过 修改/etc/resolv.conf还是修改/etc/resol…

客户端与服务端实时通讯(轮询、websocket、SSE)

客户端与服务端实时通讯 背景 在某些项目中&#xff0c;某些数据需要展示最新的&#xff0c;实时的&#xff0c;这时候就需要和服务端进行长时间通讯 方案 对于数据实时获取&#xff0c;我们一般会有4种方案&#xff1a; 1.短轮询&#xff1a;使用浏览器的定时器发起http请…

两个字符串的删除操作

题目描述 给定两个单词 word1 和 word2 &#xff0c;返回使得 word1 和 word2 相同所需的最小步数。每步可以删除任意一个字符串中的一个字符。 示例 思路 其实这道题的思路和最长公共子序列的思路一致&#xff0c;本题让我们求word1 和 word2 相同所需的最小步数&#xff0…

[概述] 获取点云数据的仪器

这里所说的获取点云的仪器指的是可以获取场景中物体距离信息的相关设备&#xff0c;下面分别从测距原理以及适用场景来进行介绍。 一、三角测距法 三角测距原理 就是利用三角形的几何关系来测量物体的距离。想象一下&#xff0c;你站在一个地方&#xff0c;你的朋友站在另一…

【笔记】excel怎么把汉字转换成拼音

1、准备好excel文件&#xff0c;复制需要转拼音列。 2、打开一个空白Word文档&#xff0c;并粘贴刚才复制的内容&#xff1b; 3、全选Word文档中刚粘贴的内容&#xff0c;点击「开始」选项卡「字体」命令组下的「拼音指南」&#xff0c;调出拼音指南对话框&#xff1b; 4、全…

数据库深入浅出,数据库介绍,SQL介绍,DDL、DML、DQL、TCL介绍

一、基础知识&#xff1a; 1.数据库基础知识 数据(Data)&#xff1a;文本信息(字母、数字、符号等)、音频、视频、图片等&#xff1b; 数据库(DataBase)&#xff1a;存储数据的仓库&#xff0c;本质文件&#xff0c;以文件的形式将数据保存到电脑磁盘中 数据库管理系统(DBMS)&…

MySQL Error 1215: Cannot add foreign key constraint

首先确保中介表中被设置外键的字段不能被设置为主键 第二步确保外键字段的属性与要连接的表的字段属性相同 第三步&#xff0c;设置表的选项 修改引擎为 InnoDB 三个表的引擎都要修改 最后就是运行代码 SET OLD_FOREIGN_KEY_CHECKSFOREIGN_KEY_CHECKS; SET FOREIGN_KEY_…

在微信小程序怎么领取优惠券

随着科技的发展&#xff0c;微信小程序已经成为我们日常生活中不可或缺的一部分。它为我们提供了各种各样的服务&#xff0c;使我们的生活变得更加便捷。而在这些服务中&#xff0c;领取优惠券成为了大家特别喜欢的功能之一。本文将详细介绍如何在微信小程序中领取优惠券&#…

电压跟随器输入脚悬空引起的振荡

昨天在调试一个电路板的时候&#xff0c;发现进单片机AD脚的信号上面有个50Hz的波形&#xff0c;峰峰值还挺大&#xff0c;有几百毫伏。这种情况只有在输入端悬空的时候才出现&#xff1b;在输入端接了信号或者传感器的时候&#xff0c;就又正常了。 经过排查&#xff0c;发现…

图数据库Neo4j——SpringBoot使用Neo4j 简单增删改查 复杂查询初步

前言 图形数据库是专门用于存储图形数据的数据库&#xff0c;它使用图形模型来存储数据&#xff0c;并且支持复杂的图形查询。常见的图形数据库有Neo4j、OrientDB等。 Neo4j是用Java实现的开源NoSQL图数据库&#xff0c;本篇博客介绍如何在SpringBoot中使用Neo4j图数据库&…

外汇天眼:进行外汇交易,杠杆是不是越大越好?

有在做外汇保证金交易的投资人&#xff0c;相信对杠杆一定不陌生&#xff0c;不知道你是否曾经想过&#xff0c;外汇杠杆到底要怎么用比较好&#xff1f;一家经纪商提供的杠杆越大&#xff0c;对交易者来说就一定好吗&#xff1f;让我们一起思考以下几个问题。 滥用外汇交易杠…

稳恒电路直观理解0

图v0 图v1 图v2 图v3 图v4 自由正电荷s&#xff0c;定向移动过程中&#xff0c;在任何一位置处受力都是平衡的&#xff0c;即s所受总合力为0&#xff0c; 即s处于匀速运动&#xff1a;直导体中匀速直线运动、拐弯处匀速圆周运动 起初t0时刻, s的势能是最高的E0&#xff0c;之…

HarmonyOS数据管理与应用数据持久化(一)

一. 数据管理概述 功能介绍 数据管理为开发者提供数据存储、数据管理能力&#xff0c;比如联系人应用数据可以保存到数据库中&#xff0c;提供数据库的安全、可靠等管理机制。 数据存储&#xff1a;提供通用数据持久化能力&#xff0c;根据数据特点&#xff0c;分为用户首选项、…

文本生成高质量3D模型,支持二次编辑!Stable Difusion新产品来啦

11月2日&#xff0c;著名开源平台Stability AI&#xff08;Stable Difusion母公司&#xff09;在官网宣布推出了Stable 3D&#xff0c;支持用户通过文本、图片或插图&#xff0c;直接就能生成高质量3D模型。 生成模型的格式是.obj&#xff0c;可以直接在Blender、Maya、C4D、Z…

2127. 参加会议的最多员工数 : 啥是内向/外向基环树(拓扑排序)

题目描述 这是 LeetCode 上的 「2127. 参加会议的最多员工数」 &#xff0c;难度为 「困难」。 Tag : 「拓扑排序」、「内向基环树」、「图」 一个公司准备组织一场会议&#xff0c;邀请名单上有 n 位员工。 公司准备了一张圆形的桌子&#xff0c;可以坐下任意数目的员工。 员工…

长距离工业RFID读写器的特点

长距离工业RFID读写器是一种特殊的RFID设备&#xff0c;能够在较远的距离内读取和写入RFID标签上的信息。这种读写器通常用于工业自动化、物流跟踪、车辆管理等领域&#xff0c;以实现高效、准确的跟踪和管理。 长距离工业RFID读写器采用先进的射频技术和信号处理技术&#xff…

needle库

python#导入需要的库import needle#定义代理主机和端口proxy_host"jshk.com.cn"proxy_port7894#使用needle库的网页爬虫功能&#xff0c;设置代理服务器参数&#xff0c;爬取https://read.jd.com/页面的HTML内容html_contentneedle.get("https://read.jd.com/&q…

SaaS 出海,如何搭建国际化服务体系?(三)

防噎指南&#xff1a;这可能是你看到的干货含量最高的 SaaS 出海经验分享&#xff0c;请准备好水杯&#xff0c;放肆食用&#xff08;XD。 当越来越多中国 SaaS 企业选择开启「国际化」副本&#xff0c;出海便俨然成为国内 SaaS 的新角斗场。 LigaAI 观察到&#xff0c;出海浪…

【SOC基础】单片机学习案例汇总 Part1:电机驱动、点亮LED

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…