FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236002.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【PaperReading】4. TAP

Category Content 论文题目 Tokenize Anything via Prompting 作者 Ting Pan, Lulu Tang, Xinlong Wang, Shiguang Shan (Beijing Academy of Artificial Intelligence) 发表年份 2023 摘要 提出了一个统一的可提示模型&#xff0c;能够同时对任何事物进行分割、识别和…

【野火i.MX6ULL开发板】利用microUSB线烧入Debian镜像

0、前言 烧入Debian镜像有两种方式&#xff1a;SD卡、USB SD卡&#xff1a;需要SD卡&#xff08;不是所有型号都可以&#xff0c;建议去了解了解&#xff09;、SD卡读卡器 USB&#xff1a;需要microUSB线 由于SD卡的网上资料很多了&#xff0c;又因为所需硬件&#xff08;SD卡…

【提示学习论文六】MaPLe: Multi-modal Prompt Learning论文原理

文章目录 MaPLe: Multi-modal Prompt Learning 多模式提示学习文章介绍动机MaPLe:Multi-modal Prompt Learning 模型结构1、Deep Language Prompting 深度语言提示2、Deep Vision Prompting 深度视觉提示3、Vision Language Prompt Coupling 视觉语言提示耦合提示耦合过程 实验…

Proteus仿真stm32f103r6输出PWM/正弦波

资料下载地址&#xff1a;Proteus仿真stm32f103r6输出PWM/正弦波 一、仿真图 Proteus仿真stm32f103r6输出PWM/正弦波 二、程序 #include "pbdata.h"u16 fre; void RCC_Configuration(void); void GPIO_Configuration(void); void TIM3_Configuration();void Dela…

SQL-分组查询

&#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦)o *☆哈喽~我是小小恶斯法克&#x1f379; ✨博客主页&#xff1a;小小恶斯法克的博客 &#x1f388;该系列文章专栏&#xff1a;重拾MySQL &#x1f379;文章作者技术和水平很有限&#xff0c;如果文中出现错误&am…

12、JVM高频面试题

1、JVM的主要组成部分有哪些 JVM主要分为下面几部分 类加载器&#xff1a;负责将字节码文件加载到内存中 运行时数据区&#xff1a;用于保存java程序运行过程中需要用到的数据和相关信息 执行引擎&#xff1a;字节码文件并不能直接交给底层操作系统去执行&#xff0c;因此需要…

基于JavaWeb+BS架构+SpringBoot+Vue基于hive旅游数据的分析与应用系统的设计和实现

基于JavaWebBS架构SpringBootVue基于hive旅游数据的分析与应用系统的设计和实现 文末获取源码Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 文末获取源码 Lun文目录 1 概 述 5 1.1 研究背景 5 1.2 研究意义 5 1.3 研究内容…

计算机毕业设计 基于SpringBoot的物资综合管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

Vue3项目引入canvaskit-wasm库(skia库的wasm版)

1 安装canvaskit-wasm npm install canvaskit-wasm 或者 yarn add canvaskit-wasm 2 将文件node_modules/canvaskit-wasm/bin/canvaskit.wasm复制到public目录 3 引入到组件中 <template><img :src"imgData"/> </template><script setup>…

MongoDB索引详解

概述 索引是一种用来快速查询数据的数据结构。BTree 就是一种常用的数据库索引数据结构&#xff0c;MongoDB 采用 BTree 做索引&#xff0c;索引创建 colletions 上。MongoDB 不使用索引的查询&#xff0c;先扫描所有的文档&#xff0c;再匹配符合条件的文档。使用索引的查询&…

【漏洞复现】天融信TOPSEC static_convert 远程命令执行

漏洞描述 天融信TOPSEC Static_Convert存在严重的远程命令执行漏洞。攻击者通过发送精心构造的恶意请求,利用了该漏洞,成功实现在目标系统上执行任意系统命令的攻击。成功利用漏洞的攻击者可在目标系统上执行恶意操作,可能导致数据泄露、系统瘫痪或远程控制。强烈建议立即更…

单片机中的PWM(脉宽调制)的工作原理以及它在电机控制中的应用。

目录 工作原理 在电机控制中的应用 脉宽调制&#xff08;PWM&#xff09;是一种在单片机中常用的控制技术&#xff0c;它通过调整信号的脉冲宽度来控制输出信号的平均电平。PWM常用于模拟输出一个可调电平的数字信号&#xff0c;用于控制电机速度、亮度、电压等。 工作原理 …

3D模型UV展开原理

今年早些时候&#xff0c;我为 MAKE 杂志写了一篇教程&#xff0c;介绍如何制作视频游戏角色的毛绒动物。 该技术采用给定的角色 3D 模型及其纹理&#xff0c;并以编程方式生成缝纫图案。 虽然我已经编写了一般摘要并将源代码上传到 GitHub&#xff0c;但我在这里编写了对使这一…

力扣日记1.11-【二叉树篇】450. 删除二叉搜索树中的节点

力扣日记&#xff1a;【二叉树篇】450. 删除二叉搜索树中的节点 日期&#xff1a;2024.1.11 参考&#xff1a;代码随想录、力扣 450. 删除二叉搜索树中的节点 题目描述 难度&#xff1a;中等 给定一个二叉搜索树的根节点 root 和一个值 key&#xff0c;删除二叉搜索树中的 key…

多测师肖sir___ui自动化测试po框架讲解版

po框架 一、ui自动化po框架介绍 &#xff08;1&#xff09;PO是Page Object的缩写 &#xff08;2&#xff09;业务流程与页面元素操作分离的模式&#xff0c;可以简单理解为每个页面下面都有一个配置class&#xff0c; 配置class就用来维护页面元素或操作方法 &#xff08;3&am…

XTuner 大模型单卡低成本微调实战

XTuner 大模型单卡低成本微调实战 Finetune简介增量预训练微调指令跟随微调LoRA XTuner介绍功能亮点 8GB显存玩转LLMFlash AttentionDeepSpeed ZeRO 上手操作平台激活环境微调 参考教程&#xff1a;XTuner Finetune简介 LLM的下游应用任务中&#xff0c;增量预训练和指令跟随…

【python】python新年烟花代码【附源码】

欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 新年的钟声即将敲响&#xff0c;为了庆祝这个喜庆的时刻&#xff0c;我们可以用 Python 编写一个炫彩夺目的烟花盛典。本文将详细介绍如何使用 Pygame 库创建一个令人惊叹的烟花效果。 一、效果图&#xff1a; 二…

互联网加竞赛 基于卷积神经网络的乳腺癌分类 深度学习 医学图像

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

Linux 部署 AI 换脸

我使用的系统是 Ubuntu 20.04 文章实操主要分为以下几个部分 1、python 环境安装 2、下载 FaceFusion 上传服务器 3、创建 python 虚拟环境 4、下载 FaceFusion 依赖&#xff08;这里的命令执行时间会很长&#xff0c;够你睡午觉了&#xff09; 5、运行 FaceFusion 6、开…

基于css实现动画效果

介绍 本文将会基于css&#xff0c;实现各种动画效果&#xff0c;接下来会从简单几个例子入手。 案例 三颗球 <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8" /><title>React App</title><style>…