Flink学习记录

可以快速搭建一个Flink编写程序

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.17.1 \-DgroupId=com.zxx.langhuan \-DartifactId=langhuan-flink \-Dversion=1.0.0-SNAPSHOT \-Dpackage=com.zxx.langhuan.flink \-DinteractiveMode=false

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs。

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

基本的 stream source

// DataStream API
// fromElements
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));// fromCollection
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);// socketTextStream
DataStream<String> lines = env.socketTextStream("localhost", 9999)// readTextFile
DataStream<String> lines = env.readTextFile("file:///path");// customSource
DataStreamSource<OUT> out = env.addSource(SourceFunction<OUT> function);// Table API
// 创建TableEnvironment(表环境)。 创建表环境时,你可以设置作业属性,定义应用的批流模式,以及创建数据源。 我们先创建一个标准的表环境,并选择流式执行器。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 批处理模式
// EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);tEnv.executeSql("CREATE TABLE transactions (\n" +"    account_id  BIGINT,\n" +"    amount      BIGINT,\n" +"    transaction_time TIMESTAMP(3),\n" +"    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +") WITH (\n" +"    'connector' = 'kafka',\n" +"    'topic'     = 'transactions',\n" +"    'properties.bootstrap.servers' = 'kafka:9092',\n" +"    'format'    = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE spend_report (\n" +"    account_id BIGINT,\n" +"    log_ts     TIMESTAMP(3),\n" +"    amount     BIGINT\n," +"    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +") WITH (\n" +"    'connector'  = 'jdbc',\n" +"    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +"    'table-name' = 'spend_report',\n" +"    'driver'     = 'com.mysql.jdbc.Driver',\n" +"    'username'   = 'sql-demo',\n" +"    'password'   = 'demo-sql'\n" +")");Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

支持的connectors

DataStream ConnectorsTable API Connectors
DataGen
Kafka
Cassandra
DynamoDB
ElasticSearch
Firehose
Kinesis
MongoDB
OpenSearch
文件系统
RabbitMQ
Google Cloud PubSub
Hybrid Source
Pulsar
JDBC
Upsert Kafka
HBase
Print
BlackHole
Hive

场景说明

一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
JobManager 负责处理 Job 提交、 Job 监控以及资源管理。
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 
 

有界流:批处理

无界流:流处理

无状态的转换

map()

flatmap()

keyBy()

以下情况,一个类不能作为 key

  1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  2. 它是任意类的数组。

reduce() 和其他聚合算子  

数据流转换

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

Rich Functions

  • open(Configuration c):仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
  • close()
  • getRuntimeContext():为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
  • setRuntimeContext()
  • getIterationRuntimeContext()

ValueState:要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。(相同的key共享)

  • value()
  • update()
  • clear()

Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time): Flink 读取事件时记录的时间

  • 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间

Watermarks: 定义何时停止等待较早的事件

//        AssignerWithPunctuatedWatermarks 在每个事件上都可以提供水位线,因此水位线可以更加灵活和精确地根据事件的特性进行生成。
//        AssignerWithPeriodicWatermarks 在一定的时间间隔内提供水位线,因此水位线的生成不会频繁,适用于事件的时间戳变化频率较高,而水位线的变化频率较低的情况。
//        在选择使用哪个接口时,可以根据具体的业务需求和事件流的特点来决定。如果事件的时间戳和水位线的变化较为频繁,或者需要更精确的控制,可以选择 AssignerWithPunctuatedWatermarks。如果事件的时间戳变化较为平稳,或者水位线的变化不需要那么频繁,可以选择 AssignerWithPeriodicWatermarks。// WatermarkStrategy 接口包含以下方法:
//        withTimestampAssigner:指定如何从事件中提取事件时间戳(timestamp)。
//        withIdleness:配置是否在流处于空闲状态时发出水位线,默认为不发出水位线。
//        withIdlenessTimeout:指定流处于空闲状态多久后发出水位线。
//        withTimestampAssignerAndIdlenessTimeout:同时指定事件时间戳提取逻辑和流空闲状态的配置。// 通常,你可以通过静态工厂方法 WatermarkStrategy.forXXX() 来创建特定的水位线策略,其中 XXX 可以是以下几种类型之一:
//        forBoundedOutOfOrderness:适用于乱序但有界的事件流,根据最大允许的乱序程度指定固定的延迟时间。
//        forMonotonousTimestamps:适用于单调递增的事件流,例如源自某些消息队列的事件流。
//        forGenerator:通过自定义的水位线生成器函数来创建水位线策略。
//        forPeriodicBoundedOutOfOrderness:适用于周期性有界乱序事件流,指定固定的延迟时间和乱序间隔。
//        forEventTime:适用于事件时间已经正确嵌入在事件中的情况,不需要提取时间戳。DataStream<Event> stream = ...;WatermarkStrategy<Event> strategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.timestamp);DataStream<Event> withTimestampsAndWatermarks =stream.assignTimestampsAndWatermarks(strategy);

Windows

Flink 有一些内置的窗口分配器,如下所示:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每10秒钟计算前1分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n)Time.seconds(n)Time.minutes(n)Time.hours(n), 和 Time.days(n)

窗口应用函数 

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。
DataStream<SensorReading> input = ...;input.keyBy(x -> x.key).window(TumblingEventTimeWindows.of(Time.minutes(1))).process(new MyWastefulMax());public static class MyWastefulMax extends ProcessWindowFunction<SensorReading,                  // 输入类型Tuple3<String, Long, Integer>,  // 输出类型String,                         // 键类型TimeWindow> {                   // 窗口类型@Overridepublic void process(String key,Context context,Iterable<SensorReading> events,Collector<Tuple3<String, Long, Integer>> out) {int max = 0;for (SensorReading event : events) {max = Math.max(event.value, max);}out.collect(Tuple3.of(key, context.window().getEnd(), max));}
}// Context 接口 展示大致如下:
public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();
}// windowState 和 globalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
// 增量聚合示例
DataStream<SensorReading> input = ...;input.keyBy(x -> x.key).window(TumblingEventTimeWindows.of(Time.minutes(1))).reduce(new MyReducingMax(), new MyWindowFunction());private static class MyReducingMax implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r1 : r2;}
}private static class MyWindowFunction extends ProcessWindowFunction<SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {@Overridepublic void process(String key,Context context,Iterable<SensorReading> maxReading,Collector<Tuple3<String, Long, SensorReading>> out) {SensorReading max = maxReading.iterator().next();out.collect(Tuple3.of(key, context.window().getEnd(), max));}
}

晚到的事件

  • 旁路输出

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};SingleOutputStreamOperator<Event> result = stream.keyBy(...).window(...).sideOutputLateData(lateTag).process(...);DataStream<Event> lateStream = result.getSideOutput(lateTag);
  • 指定允许的延迟
stream.keyBy(...).window(...).allowedLateness(Time.seconds(10)).process(...);

如果延迟的事件需要特殊处理,可以从Context中获取水位线时间来做比较

public void processElement(TaxiFare fare,Context ctx,Collector<Tuple3<Long, Long, Float>> out) throws Exception {long eventTime = fare.getEventTime();TimerService timerService = ctx.timerService();if (eventTime <= timerService.currentWatermark()) {// 事件延迟;其对应的窗口已经触发。} else {// 其他处理}
}

深入了解窗口操作

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。

window 后面可以接 window

比如说:

stream.keyBy(t -> t.key).window(<window assigner>).reduce(<reduce function>).windowAll(<same window assigner>).reduce(<same reduce function>);

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟事件可能导致延迟合并

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 聚合。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/87369.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ffmepg滤镜

视频按顺时针方向旋转90度 ffplay -vf transpose1 -i juren-30s.mp4 ffplay -f lavfi -i testsrc -vf transpose1 -f lavfi -i testsrc这个滤镜是ffmpeg给用户的一个测试使用的视频 视频水平翻转(左右翻转) -vf hflip 实现慢速播放&#xff0c;声音速度是原始速度的50% ffpla…

智慧家庭如何落地?三翼鸟把答案写在用户家里

近年来&#xff0c;学术界流行一句话&#xff0c;“把论文写在中国大地上”。 一项新技术从实验室到千万家&#xff0c;落地难、转化低&#xff0c;是技术创新经常碰到的问题。所以&#xff0c;如何让新技术扎根大地、扎根真实需求&#xff0c;普惠人间&#xff0c;是中国产学研…

构建Docker容器监控系统 (1)(Cadvisor +InfluxDB+Grafana)

目录 Cadvisor InfluxDBGrafana 1. Cadvisor 2.InfluxDB 3.Grafana 开始部署&#xff1a; 下载组件镜像 创建自定义网络 创建influxdb容器 创建数据库和数据库用户 创建Cadvisor 容器 准备测试镜像 创建granafa容器 访问granfana 添加数据源 Add data source 新建 …

开发过程中遇到的问题以及解决方法

巩固基础&#xff0c;砥砺前行 。 只有不断重复&#xff0c;才能做到超越自己。 能坚持把简单的事情做到极致&#xff0c;也是不容易的。 开发过程中遇到的问题以及解决方法 简单易用的git命令 git命令&#xff1a; 查看有几个分支&#xff1a;git branch -a 切换分支&#…

设计模式(4)装饰模式

一、介绍&#xff1a; 1、应用场景&#xff1a;把所需的功能按正确的顺序串联起来进行控制。动态地给一个对象添加一些额外的职责&#xff0c;就增加功能来说&#xff0c;装饰模式比生成子类更加灵活。 当需要给一个现有类添加附加职责&#xff0c;而又不能采用生成子类的方法…

Linux查看GPU显卡/CPU内存/硬盘信息

显卡信息命令/CPU内存/硬盘 1.显卡2、CPU内存3、硬盘 1.显卡 nvidia-smi nvidia-smi&#xff08;显示一次当前GPU占用情况&#xff09; nvidia-smi -l&#xff08;每秒刷新一次并显示&#xff09; watch -n 5 nvidia-smi &#xff08;其中&#xff0c;5表示每隔6秒刷新一次终端…

2498. 青蛙过河 II;2568. 最小无法得到的或值;1954. 收集足够苹果的最小花园周长

2498. 青蛙过河 II 核心思想&#xff1a;这题有点开脑洞&#xff0c;就是如果想让代价最小只能是隔一个石头跳&#xff0c;因为其他方法的路径都会形成比这种方法大的结果&#xff0c;然后我们只需要统计出间隔石头的最大值即可。 2568. 最小无法得到的或值 核心思想&#xf…

在Ubuntu中使用Docker启动MySQL8的天坑

写在前面 简介&#xff1a; lower_case_table_names 是mysql设置大小写是否敏感的一个参数。 1.参数说明&#xff1a; lower_case_table_names0 表名存储为给定的大小和比较是区分大小写的 lower_case_table_names 1 表名存储在磁盘是小写的&#xff0c;但是比较的时候是不区…

白帽黑帽与linux安全操作

目录 白帽黑帽 Linux安全 白帽黑帽 白帽&#xff08;White Hat&#xff09;和黑帽&#xff08;Black Hat&#xff09;通常用于描述计算机安全领域中的两种不同角色。白帽黑客通常被认为是合法的安全专家&#xff0c;他们通过合法途径寻找和修复安全漏洞&#xff0c;帮助企业和…

Unity之ShaderGraph 节点介绍 Procedural节点

程序化 噪声Gradient Noise&#xff08;渐变或柏林噪声&#xff09;Simple Noise&#xff08;简单噪声&#xff09;Voronoi&#xff08;Voronoi 噪声&#xff09; 形状Ellipse&#xff08;椭圆形&#xff09;Polygon&#xff08;正多边形&#xff09;Rectangle&#xff08;矩形…

JavaScript 运行机制详解:再谈Event Loop

一、为什么JavaScript是单线程&#xff1f; JavaScript语言的一大特点就是单线程&#xff0c;也就是说&#xff0c;同一个时间只能做一件事。那么&#xff0c;为什么JavaScript不能有多个线程呢&#xff1f;这样能提高效率啊。 JavaScript的单线程&#xff0c;与它的用途有关。…

【JVM】类装载的执行过程

文章目录 类装载的执行过程1.加载2.验证3.准备4.解析5.初始化6.使用7.卸载 类装载的执行过程 类装载总共分为7个过程&#xff0c;分别是 加载&#xff0c;验证&#xff0c;准备、解析、初始化、使用、卸载 1.加载 将类的字节码文件加载到内存(元空间&#xff09;中。这一步会…

AWS中Lambda集成SNS

1.创建Lambda 在Lambda中&#xff0c;创建名为AWSSNSDemo的函数 use strict console.log(loading function); var aws require(aws-sdk); var docClient new aws.DynamoDB.DocumentClient(); aws.config.regionap-southeast-1;exports.handler function(event,context,cal…

详细记录Pycharm配置已安装好的Conda虚拟环境

当安装好conda环境之后&#xff0c;想要在Pycharm中使用&#xff0c;那么就要在Pycharm中导入&#xff0c;我这里使用的pycharm-professional-2023.2这个版本&#xff0c;下面是详细步骤&#xff1a; 1.打开File->Settings&#xff1a; 2.找到Project——>Python Inter…

Openlayers实战:列表与图层双向信息提示

在Openlayers的实际项目中,经常会在左侧列出信息列表,右边的地图上显示的是对应的图层内容,两边是一一对应的,为了看出来选择的是哪一个,就需要两边互相提示,本示例就很好的展示了这种效果,具体的方法请参考源代码。 效果图 源代码 /* * @Author: 大剑师兰特(xiaozhu…

阶梯费用计算(配置化_最小demo)

本文旨在提供一种配置化思路计算阶梯费用&#xff0c;更高级的做法则是通过数据库配置&#xff0c;注册中心等&#xff1b;在表达式上可以采用自定义或者spel表达式等其他方式进行处理&#xff1b;(代码仅展示最小demo,部分不完善orBug地方自行补充) 思路&#xff1a;N个区间对…

【leetcode】前缀和

内容摘抄自&#xff1a; 小而美的算法技巧&#xff1a;前缀和数组 | labuladong 的算法小抄 一维数组的前缀和 看这个 preSum 数组&#xff0c;若想求索引区间 [1, 4] 内的所有元素之和&#xff0c; 就可以通过 preSum[5] - preSum[1] 得出。 class NumArray {private:// 前缀…

内网隧道—HTTP\DNS\ICMP

本文仅限于安全研究和学习&#xff0c;用户承担因使用此工具而导致的所有法律和相关责任&#xff01; 作者不承担任何法律和相关责任&#xff01; HTTP隧道 Neo-reGeorg Neo-reGeorg 是一个旨在积极重构 reGeorg 的项目&#xff0c;目的是&#xff1a; 提高可用性&#xff0…

【双指针_盛最多水的容器_C++】

题目解析 盛最多水的容器 算法原理 向内枚举&#xff1a; weight一定会减小 height不是不变就是减小要求的是盛水最多的容器&#xff0c;那么这些枚举情况就不需要。 拿比较小的数去向内枚举&#xff0c;v一直在减小&#xff0c;所以说直接排除 编写代码 class Soluti…

智汇云舟入选IDC《中国智慧城市数字孪生技术评估,2023》报告

8月7日&#xff0c;国际数据公司&#xff08;IDC&#xff09;发布了《中国智慧城市数字孪生技术评估&#xff0c;2023》报告。智汇云舟凭借在数字孪生领域的创新技术与产品&#xff0c;入选《2023中国数字孪生城市技术提供商图谱》。 报告通过公开征集的形式进行申报&am…