大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • FlinkDataStreamAPI 自定义输入源
  • 非并行源介绍与代码
  • 并行源介绍与代码

在这里插入图片描述

Rich并行源

基本介绍

在 Apache Flink 中,RichSourceFunction 是一种增强的源函数(Source Function),它允许开发者在定义源操作时,能够访问 Flink 的生命周期方法、状态管理、配置访问等更多功能。RichSourceFunction 是并行源的一个扩展,它继承自 RichFunction 接口,而 RichFunction 提供了更丰富的功能,比如访问运行时上下文、管理状态、以及在作业开始和结束时执行初始化或清理操作。

主要特点

  • 生命周期方法:RichSourceFunction 提供了 open() 和 close() 方法,分别在作业开始时和结束时调用。这允许你在数据读取前进行初始化操作(如打开连接、加载配置),以及在作业结束时进行清理工作(如关闭连接、释放资源)。
  • 访问运行时上下文:通过 getRuntimeContext() 方法,RichSourceFunction 可以访问 Flink 的运行时上下文,获取并行度信息、任务名称、指标管理器,以及与状态相关的操作。
  • 状态管理:RichSourceFunction 可以结合 Flink 的状态管理机制,保存和恢复状态。这对于需要在流处理中维护中间状态的源函数非常有用,尤其是在故障恢复时,状态可以帮助恢复到故障前的状态。
  • 并行执行:与普通的 SourceFunction 类似,RichSourceFunction 也可以通过设置并行度来并行执行,这使得它可以处理大规模的数据
    源。

状态管理

RichFunction 与 Flink 的状态管理系统高度集成,允许你在分布式环境中维护和管理操作符的中间状态。Flink 支持两种主要类型的状态:ValueState 和 ListState,以及更复杂的 MapState 和 ReducingState。

  • ValueState: 适用于需要保存单个值的场景,如计数器、标志位等。
  • ListState: 适用于需要保存多个值的场景,如窗口计算中的中间结果。
  • MapState: 适用于需要维护键值对的场景,特别是在进行复杂的数据关联或聚合时。
  • ReducingState: 适用于需要持续聚合数据的场景,比如计数、求和等。

示例代码

以下是一个使用 RichParallelSourceFunction 的简单示例,展示了如何在 Flink 中实现一个并行的、具有生命周期管理的源函数:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class MyRichParallelSource extends RichParallelSourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void open(Configuration parameters) throws Exception {// 在任务开始时执行初始化操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is starting.");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 模拟数据流的产生while (isRunning) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Data from task " + getRuntimeContext().getIndexOfThisSubtask());}Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void close() throws Exception {// 在任务结束时执行清理操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is closing.");}
}

代码解析

  • open() 方法:在任务开始时调用,适用于进行连接初始化、参数设置等操作。在这个方法中,你可以访问 Flink 的配置和运行时上下文。
  • run() 方法:实现数据源的核心逻辑,这个方法会在源函数启动后被调用。可以使用 ctx.collect() 方法将生成的数据发送到下游处理。
  • cancel() 方法:用于取消任务。当作业被取消或停止时,Flink 会调用这个方法,可以在这里做一些清理工作或者安全地停止数据生成。
  • close() 方法:在任务结束时调用,用于释放资源和进行清理操作。

注意事项

  • 状态一致性:在并行源中,如果需要维护状态,一定要注意状态的一致性和恢复机制,确保在作业恢复时可以正确地恢复数据源的状态。
  • 并行度设置:RichParallelSourceFunction 作为并行源,可以通过 setParallelism 方法设置并行度,确保根据任务的需求合理分配并行实例的数量。

RichParallelSource

package icu.wzk;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class RichParallelSourceRich extends RichParallelSourceFunction<String> {private long count = 1L;private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {count ++;ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

RichParallelSourceTest

package icu.wzk;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;public class RichParallelSourceRichTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(new RichParallelSourceRich());data.print();env.execute("RichParallelSourceRichTest");}}

运行结果

3> 10
5> 10
8> 10
6> 10
2> 10
4> 10
7> 10
1> 10
6> 11
5> 11
8> 11
2> 11
3> 11
4> 11
7> 11
1> 11
2> 12
3> 12
...

控制台输出结果如下所示:
在这里插入图片描述

为什么 Rich 类使用广泛

  • 生命周期管理:Rich 类提供了 open() 和 close() 方法,允许开发者在任务开始和结束时执行初始化和清理操作。这对于需要设置资源(如数据库连接、文件读写、外部服务连接)的操作非常有用。
  • 运行时上下文访问:通过 getRuntimeContext(),Rich 类可以访问任务的并行度信息、任务名称、子任务索引、状态管理等。对于需要根据任务上下文调整行为或需要跨并行实例共享状态的场景,这些信息是至关重要的。
  • 状态管理:RichFunction 可以方便地与 Flink 的状态管理结合使用。在状态丰富的应用场景(如需要维护中间计算结果、计数器、缓存等)的流处理中,Rich 类显得非常有用。
  • 性能监控:Rich 类允许开发者在 open() 方法中注册 Flink 的度量指标(Metrics),帮助监控和优化作业的性能。

什么时候不用 Rich 类

  • 简单操作:如果你只是需要进行简单的转换或过滤操作,没有复杂的初始化、状态管理或清理需求,那么 Rich 类的额外功能可能并不必要。
  • 高性能要求的场景:在一些对性能要求极高的场景中,尽量减少复杂的操作和额外的上下文访问,直接使用轻量级的 MapFunction、FilterFunction 等可能会有更好的性能表现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/415107.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 高级IO

IO等&#xff08;要进行io是要有条件的&#xff0c;要有数据或者有空间&#xff09;拷贝。高效体现在等待的时间所占比重越低越高效。 阻塞IO&#xff1a;数据没有就绪&#xff0c;read不返回。在内核将数据准备好之前, 系统调用会一直等待。所有的套接字, 默认都是阻塞方式。…

nginx容器映射配置文件后,启动一直报错提示:failed (13: Permission denied)的排查

问题现象&#xff1a; 使用harbor 的install.sh 创建docker-compose之后&#xff0c;出现nginx容器一直重启。 查看日志发现是&#xff1a;配置文件无权限。报错信息如下&#xff1a; Sep 2 16:43:13 172.28.0.1 nginx[1344]: 2024/09/02 08:43:13 [emerg] 1#0: open() “/e…

百度地图绘制电子围栏(包括移动端绘制操作)以及检测坐标是否在电子围栏内

由于本人在PC端仅使用了多边形绘制&#xff0c;但矩形跟多边形用法基本一样&#xff0c;圆形并未使用&#xff0c;如不符合读者需求也可以参考一下。 绘制后得到的数据可能不同&#xff0c;但绘制方法仅仅是传递的参数不同。 关于给坐标数组在地图绘制图形的效果在移动端部分包…

深度学习系列74:语音中的mel谱

1 mel谱介绍 一个人说一句话&#xff0c;其 waveform 可以很不一样&#xff0c;但是 spectrogram 基本上会相似&#xff0c;甚至有人可以通过 spectrogram 来判断说话的内容。语谱图的横坐标是时间&#xff0c;纵坐标是频率&#xff0c;坐标点值为语音数据能量。由于是采用二维…

# 利刃出鞘_Tomcat 核心原理解析(十一)-- WebSocket -- 1

利刃出鞘_Tomcat 核心原理解析&#xff08;十一&#xff09;-- Tomcat 附加功能 WebSocket – 1 一、Tomcat专题 - WebSocket - 介绍 1、Tomcat 附加功能&#xff1a;websocket 介绍 1&#xff09;websocket &#xff1a;是 HTML5 新增的协议&#xff0c;它的目的是在浏览器…

动态规划法-资源分配问题

动态规划法 - 资源分配问题 问题描述 把4个份额的资源分配给3个工程&#xff0c;给定利润表如下表所示&#xff0c;写出资源的最优分配方案的求解过程。 4份资源分配给3个工程的利润表 步骤一&#xff1a;求各个阶段不同分配份额时的最大利润及分配份额 目标 我们的目标是…

53 mysql pid 文件的创建

前言 接上一篇文章 mysql 启动过程中常见的相关报错信息 在 mysql 中文我们在 “service mysql start”, “service mysql stop” 经常会碰到 mysql.pid 相关的错误信息 比如 “The server quit without updating PID file” 我们这里来看一下 mysql 中 mysql.pid 文件的…

微积分复习笔记 Calculus Volume 1 - 1.3Trigonometric Functions

1.3 Trigonometric Functions - Calculus Volume 1 | OpenStax

自己开发完整项目一、登录功能-05(动态权限控制)

一、上节回顾 在上一节中&#xff0c;我们介绍了如何通过数据库查询用户的权限&#xff0c;并对方法级别的接口使用注解的方式进行权限控制&#xff0c;之后通过用户携带的tocken进行解析权限&#xff0c;判断是否可以访问。 具体步骤&#xff1a; 1.在查询用户信息的时候将用户…

map和set的封装

目录 一、红黑树的改造 1.1节点的定义 二、红黑树的迭代器 2.1用节点封装迭代器 2.2迭代器的实现 2.3map和set的迭代器 三、insert的返回值 3.1insert返回值的用处 3.2operator[ ]的实现 四、key不能修改的问题 封装map和set一般分为六步&#xff1a; 封装map和set …

MFC生成dll的区别

主要分三种&#xff1a; A. 动态链接库(dll) B.具有导出项的(dll)动态链接库 C.MFC动态链接库 对比项目&#xff1a;可以根据需要选择哪种dll方便 添加自定义导出功能Demo 1. 添加导出实现接口&#xff1a; A. 导出需要具有&#xff1a;__declspec(dllexport) B. 按照C语言…

Javascript LeeCode选题(汉诺塔求解)

LeeCode选题 汉诺塔递归求解move移动函数hanoi函数main方法测试代码&#xff1a;代码实现 汉诺塔递归求解 汉诺塔介绍&#xff1a; 汉诺塔的的图形&#xff08;从上到下1&#xff0c;2&#xff0c;3个&#xff09;实现&#xff1a; 这里我们可以看到因为必须要将第n个移动到…

Spring中基于redis stream 的消息队列实现方法

本文主要介绍了消息队列的概念性质和应用场景&#xff0c;介绍了kafka、rabbitMq常用消息队列中间件的应用模型及消息队列的实现方式&#xff0c;并实战了在Spring中基于redis stream 的消息队列实现方法。 一、消息队列 消息队列是一种进程间通信或者同一个进程中不同线程间的…

uni-app 获取当前位置的经纬度以及地址信息

文章目录 uni.getLocation(objc)获取经纬度和地址调试结果问题 uni-app 获取当前位置的经纬度以及地址信息 uni.getLocation(objc) uni-app官方文档定位API: uni.getLocation(OBJECT) uni.getLocation({type: wgs84,success: function (res) {console.log(当前位置的经度&…

【系统架构设计】嵌入式系统设计(1)

【系统架构设计】嵌入式系统设计&#xff08;1&#xff09; 嵌入式系统概论嵌入式系统的组成硬件嵌入式处理器总线存储器I/O 设备与接口 软件 嵌入式开发平台与调试环境交叉平台开发环境交叉编译环境调试 嵌入式系统概论 嵌入性、专用性、计算机系统是嵌入式系统的三个基本的核…

【话题讨论】VS Code:倍增编程动力,实现效率飞跃

目录 引言 一、详情介绍 功能特点 使用场景 提高工作效率 二、效率对比 2.1 高度可定制性与丰富的插件生态 2.2 智能的代码补全与导航 2.3 内置的调试器与版本控制集成 2.4 轻量级与跨平台 2.5 选择合适工具的重要性 2.6 实际案例或数据展示 三、未来趋势 3.1 编…

能见度监测站—实时监测道路能见度情况

型号&#xff1a;TH-NJD10】能见度监测站是一种专门用于自动观测和存储气象观测数据的设备&#xff0c;它通过高科技手段实时监测大气能见度的变化&#xff0c;为多个领域提供重要的数据支持。主要基于光在大气中的衰减规律。传感器系统中的发射器发出光线&#xff0c;照射到空…

shell编程--正则表达式

正则表达式 正则表达式都被置于两个正斜杠之间&#xff1b;如/l[oO]ve/ 示例 匹配数字的脚本&#xff0c;用户输入创建账号的数量 语法&#xff1a; [[ ^[0-9]$ ]] 表示必须输入数字 #!/bin/bashwhile : do read -p "输入数字&#xff1a;" numif [[ $num ~ ^[…

产品需求过程管理重要性

产品需求过程管理重要性 背景 以下都是真实事项经历回顾&#xff0c;在产品开发过程中&#xff0c;产品经理与研发团队之间的沟通至关重要。然而&#xff0c;沟通不畅或信息缺失常常导致需求无法准确传达&#xff0c;最终影响产品的成功。以下是一些常见的问题&#xff1a; 1.需…

Jmeter执行多机联合负载

1、注意事项&#xff0c;负载机必须要安装jre&#xff0c;控制机则必须安装jdk。要配置同网段ip&#xff0c;双向关闭防火墙。 每个负载机要平均承担线程数。 具体执行事项查看上面截图所示&#xff0c;控制机和负载机配置。 2、先给负载机设置ip地址&#xff0c;保持与控制…