Flink常见面试题

1、Flink 的四大特征(基石)

2、Flink 中都有哪些 Source,哪些 Sink,哪些算子(方法)

预定义Source

基于本地集合的source(Collection-based-source)

基于文件的source(File-based-source)

基于网络套接字(socketTextStream)

自定义Source

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

3、什么是侧道输出流,有什么用途

侧输出-SideOutput
Flink 通过watermark在短时间内允许了乱序到来的数据

通过延迟数据处理机制,可以处理长期迟到的数据。

但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。

对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?

不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据收集起来

保存成为一个DataStream,然后,交由开发人员自行处理。

那么这个机制就叫做侧输出机制(Side Output)

4、Flink 中两个流如何合并为一个流

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

【注意】:union合并的DataStream的类型必须是一致的

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

5、Flink 中两个流如何 join

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

1.2 滑动窗口Join 

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

6、Flink 中都有哪些 window,什么是滑动,滚动窗口

Window可以分成两类:

CountWindow:按照指定的数据条数生成一个Window,与时间无关。

滚动计数窗口,每隔N条数据,统计前N条数据

滑动计数窗口,每隔N条数据,统计前M条数据

TimeWindow:按照时间生成Window。

滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N

滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N

会话窗口,按照会话划定的窗口

7、flink 中都有哪些时间语义,对于 event_time 中数据迟到的处理(数据乱序)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间。

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间。

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间。

迟到处理:

水印:对于迟到数据不长;

allowedLateness: 迟到时间很长;

侧道输出:对于迟到时间特别长。 

8、flink 中的状态指的是什么?有哪些状态,你使用过哪些状态,哪个项目使用到了状态

有状态计算和无状态计算

  • 无状态计算:
  • 不需要考虑历史数据, 相同的输入,得到相同的输出!如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
  • 有状态计算:
  • 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
    • 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

注意: Flink默认已经支持了无状态和有状态计算!

例如WordCount代码:已经做好了状态维护, 输入hello,输出(hello,1),再输入hello,输出(hello,2)。

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。

两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

托管状态
   - KeyedState ( 在keyBy之后可以使用状态 )
      - ValueState  (存储一个值)
      - ListState   (存储多个值)
      - MapState    (存储key-value) 
   - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]
 - 原生状态 (不用)

9、flink 中 checkpoint 是什么,如何设置。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息。

一句话概括: Checkpoint就是State的快照。

可使用以下方法来设置:

package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-24 09:18:30**/
public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}

10、flink 中的重启策略 (流式计算中的重启策略)

重启策略的意义:流式数据是不可能停止的,假如有一条错误数据导致程序直接退出,后面的大量数据是会丢失的,对公司来讲,意义是重大的,损失是惨重的。

重启策略是一个单独的策略,如果你配置了 checkpoint 含有重启策略的,如果你没有 checkpoint 也可以自行配置重启策略,总之重启策略和 checkpoint 没有必然联系。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。

11、什么是维表 join,如何实现,你在哪个项目中使用过维表 join

所谓的维表Join: 进入Flink的数据,需要关联另外一些存储设备的数据,才能计算出来结果,那么存储在外部设备上的表称之为维表,可能存储在mysql也可能存储在hbase 等。

实现:

通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在kafka流map()方法中与维表数据进行关联。

RichMapFunction中open方法里加载维表数据到内存的方式特点如下:

  • 优点:实现简单
  • 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。另外,维表是变化慢,不是一直不变的,只是变化比较缓慢而已。

以前的方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

  • 优点:维度数据量不受内存限制,可以存储很大的数据量。
  • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

使用cache来减轻访问压力

可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用Guava Cache。维表一般的特点是变化比较慢。在智慧城市项12目使用过。用它来存储一些预热的数据在内存中方便取出。

12、flinksql 如何读取 kafka 或者 mysql 的数据。

可通过以下代码直接实现:


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-28 11:00:51**/
public class _02KafkaConnectorDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 如果是建表语句:executeSql  这个返回值是TableResult// 如果是查询语句:sqlQuery    这个返回的是Table (有用)// 新建一个表,用于存储 kafka消息TableResult tableResult = tEnv.executeSql("CREATE TABLE table1 (\n" +"  `user_id` int,\n" +"  `page_id` int,\n" +"  `status` STRING\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic1',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'properties.group.id' = 'testGroup',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json'\n" +")");// 新建一个表,用于存储kafka中的topic2中的数据tEnv.executeSql("CREATE TABLE table2 (\n" +"  `user_id` int,\n" +"  `page_id` int,\n" +"  `status` STRING\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topic2',\n" +"  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +"  'format' = 'json'\n" +")");tEnv.executeSql("insert into table2 select * from table1 where status ='success'");// 以上代码已经写完了,下面是两个步骤分开的写法//TODO 3.transformation/查询// Table result = tEnv.sqlQuery("select user_id,page_id,status from table1 where status='success'");//输出到Kafka    DDL// tEnv.executeSql("insert into table2 select * from " + result);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行// env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/482407.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】扫雷游戏(一)

我们先设计一个简单的9*9棋盘并有10个雷的扫雷游戏。 1&#xff0c;可以用数组存放&#xff0c;如果有雷就用1表示&#xff0c;没雷就用0表示。 2&#xff0c;排查(2,5)这个坐标时&#xff0c;我们访问周围的⼀圈8个位置黄色统计周围雷的个数是1。排查(8,6)这个坐标时&#xf…

【博主推荐】C#中winfrom开发常用技术点收集

文章目录 前言1.打开文件夹并选中文件2.窗体之间传参3.异步调用&#xff1a;让数据处理不影响页面操作4.创建一个多文档界面(MDI) 应用程序5.在WinForms中使用数据绑定6.在WinForms中后台使用控件的事件处理7.在WinForms中窗体跳转的几种方式8.后台处理方法中&#xff0c;调用窗…

Matlab 绘制雷达图像完全案例和官方教程(亲测)

首先上官方教程链接 polarplothttps://ww2.mathworks.cn/help/matlab/ref/polarplot.html 上实例 % 定义角度向量和径向向量 theta linspace(0, 2*pi, 5); r1 [1, 2, 1.5, 2.5, 1]; r2 [2, 1, 2.5, 1.5, 2];% 绘制两个雷达图 polarplot(theta, r1, r-, LineWidth, 2); hold …

乌班图单机(不访问外网)部署docker和服务的方法

面向对象:Ubuntu不能访问外网的机子,部署mysql、redis、jdk8、minio 过程: 1、安装docker(照着图去这里找对应的下载下来https://download.docker.com/linux/static/stable/),将7个docker官网下载的文件下载下来后,传上去服务器随便一个文件夹或者常用的opt或者/usr/lo…

响应式编程一、Reactor核心

目录 一、前置知识1、Lambda表达式2、函数式接口 Function3、StreamAPI4、Reactive-Stream1&#xff09;几个实际的问题2&#xff09;Reactive-Stream是什么&#xff1f;3&#xff09;核心接口4&#xff09;处理器 Processor5&#xff09;总结 二、Reactor核心1、Reactor1&…

Docker for Everyone Plus——No Enough Privilege

直接告诉我们flag在/flag中&#xff0c;访问第一小题&#xff1a; sudo -l查看允许提权执行的命令&#xff1a; 发现有image load命令 题目指明了有rz命令&#xff0c;可以用ZMODEM接收文件&#xff0c;看到一些write up说可以用XShell、MobaXterm、Tabby Terminal等软件连接上…

深度学习基础2

1.损失函数 1.1 线性回归损失函数 1.1.1 MAE损失 MAE&#xff08;Mean Absolute Error&#xff0c;平均绝对误差&#xff09;通常也被称为 L1-Loss&#xff0c;通过对预测值和真实值之间的绝对差取平均值来衡量他们之间的差异。。 公式&#xff1a; 其中&#xff1a; n 是样…

【Android】组件化嘻嘻嘻gradle耶耶耶

文章目录 Gradle基础总结&#xff1a;gradle-wrapper项目根目录下的 build.gradlesetting.gradle模块中的 build.gradlelocal.properties 和 gradle.properties 组件化&#xff1a;项目下新建一个Gradle文件定义一个ext扩展区域config.gradle全局基础配置&#xff08;使用在项目…

基础Web安全|SQL注入

基础Web安全 URI Uniform Resource Identifier&#xff0c;统一资源标识符&#xff0c;用来唯一的标识一个资源。 URL Uniform Resource Locator&#xff0c;统一资源定位器&#xff0c;一种具体的URI&#xff0c;可以标识一个资源&#xff0c;并且指明了如何定位这个资源…

【插入排序】:直接插入排序、二分插入排序、shell排序

【插入排序】&#xff1a;直接插入排序、二分插入排序、shell排序 1. 直接插入排序1.1 详细过程1.2 代码实现 2. 二分插入排序2.1 详细过程2.2 代码实现 3. shell排序3.1 详细过程3.2 代码实现 1. 直接插入排序 1.1 详细过程 1.2 代码实现 public static void swap(int[]arr,…

一万台服务器用saltstack还是ansible?

一万台服务器用saltstack还是ansible? 选择使用 SaltStack 还是 Ansible 来管理一万台服务器&#xff0c;取决于几个关键因素&#xff0c;如性能、扩展性、易用性、配置管理需求和团队的熟悉度。以下是两者的对比分析&#xff0c;帮助你做出决策&#xff1a; SaltStack&…

通讯专题4.1——CAN通信之计算机网络与现场总线

从通讯专题4开始&#xff0c;来学习CAN总线的内容。 为了更好的学习CAN&#xff0c;先从计算机网络与现场总线开始了解。 1 计算机网络体系的结构 在我们生活当中&#xff0c;有许多的网络&#xff0c;如交通网&#xff08;铁路、公路等&#xff09;、通信网&#xff08;电信、…

使用OSPF配置不同进程的中小型网络

要求&#xff1a; 给每个设备的接口配置好相应的地址 对进程1的各区域使用认证&#xff0c;认证为明文发送&#xff0c;明文保存 对骨干区域使用接口认证&#xff0c;非骨干区域使用区域认证 其他ospf进程均使用区域0 FW1上配置接口信任域和非信任域和服务器&#xff0c…

软考高项经验分享:我的备考之路与实战心得

软考&#xff0c;尤其是信息系统项目管理师&#xff08;高项&#xff09;考试&#xff0c;对于众多追求职业提升与专业认可的人士来说&#xff0c;是一场充满挑战与机遇的征程。我在当年参加软考高项的经历&#xff0c;可谓是一波三折&#xff0c;其中既有成功的喜悦&#xff0…

Transformers在计算机视觉领域中的应用【第1篇:ViT——Transformer杀入CV界之开山之作】

目录 1 模型结构2 模型的前向过程3 思考4 结论 论文&#xff1a; AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE 代码&#xff1a;https://github.com/google-research/vision_transformer Huggingface&#xff1a;https://github.com/huggingf…

Unity3D模型场景等测量长度和角度功能demo开发

最近项目用到多段连续测量物体长度和角度功能&#xff0c;自己研究了下。 1.其中向量角度计算&#xff1a; 需要传入三个坐标来进行计算。三个坐标确定两条向量线段的方向&#xff0c;从而来计算夹角。 public Vector3 SetAngle(Vector3 p1, Vector3 p2,Vector3 p3) { …

蓝桥杯第 23 场 小白入门赛

一、前言 好久没打蓝桥杯官网上的比赛了&#xff0c;回来感受一下&#xff0c;这难度区分度还是挺大的 二、题目总览 三、具体题目 3.1 1. 三体时间【算法赛】 思路 额...签到题 我的代码 // Problem: 1. 三体时间【算法赛】 // Contest: Lanqiao - 第 23 场 小白入门赛 …

从0开始学PHP面向对象内容之常用设计模式(享元)

二、结构型设计模式 7、享元模式&#xff08;Flyweight Pattern&#xff09; 这里是引用享元模式&#xff08;Flyweight Pattern&#xff09; 是一种结构型设计模式&#xff0c;旨在通过共享对象来减少内存使用&#xff0c;尤其适用于大量相似对象的场景。通过共享和重用对象的…

YOLO 标注工具 AutoLabel 支持 win mac linux

常见的标注工具&#xff0c;功能基础操作繁琐&#xff0c;无复制粘贴&#xff0c;标签无法排序修改&#xff0c;UI不美观&#xff0c;bug修正不及时&#xff0c;没有集成识别、训练、模型导出… 怎么办呢&#xff1f;AutoLabel它来了 Quick Start 一图胜千言 图像标注 支持YOL…

《Python基础》之Python中可以转换成json数据类型的数据

目录 一、JSON简介 JSON有两种基本结构 1、对象&#xff08;Object&#xff09; 2、数组&#xff08;Array&#xff09; 二、将数据装换成json数据类型方法 三、在Python中&#xff0c;以下数据类型可以直接转换为JSON数据类型 1、字典&#xff08;Dictionary&#xff09…