0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lower=tab_source.map(colFunc(col('word')))   tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lower=tab_source.map(rowFunc) tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
    @udf(result_type=DataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lower=tab_source.map(rowFunc).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/175823.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1400*C. Team(模拟构造)

Problem - 401C - Codeforces 解析&#xff1a; 因为0不能相邻&#xff0c;所以0之间最少 n-1 个位置&#xff0c;最多 n1 个位置&#xff0c;如果 m<n-1显然不符题意。 并且1最多连续两个&#xff0c;所以 m>2*n2 同样不符题意。 其余情况构造即可 #include<bits/st…

【嵌入式】【GIT】如何迁移老的GIF到新的仓库时使用LFS功能并保持LOG不变

一、正常迁移流程 假设有仓库 ssh://old/buildroot-201902 需要迁移到新的仓库 ssh://old/buildroot-201902时,我们可以使用以下命令来完成: # 下载老的仓库 git clone ssh://old/buildroot-201902 # 向新的仓库上传所有的tags git push ssh://new/buildroot-201902 --tag…

【网络安全】Seeker内网穿透追踪定位

Seeker追踪定位对方精确位置 前言一、kali安装二、seeker定位1、ngrok平台注册2、获取一次性邮箱地址3、ngrok平台登录4、ngrok下载5、ngrok令牌授权6、seeker下载7、运行seeker定位8、运行隧道开启监听9、伪装链接10、用户点击&#xff08;获取定位成功&#xff09;11、利用经…

Rust-虽然9天过去了,结果是没有结果(Docker容器的端口映射问题)

​ 这篇文章收录于Rust 实战专栏。这个专栏中的相关代码来自于我开发的笔记系统。它启动于是2023年的9月14日。相关技术栈目前包括&#xff1a;Rust&#xff0c;Javascript。关注我&#xff0c;我会通过这个项目的开发给大家带来相关实战技术的分享。 前言 上上周了吧&#xf…

机器学习(六)构建机器学习模型

1.9构建机器学习模型 我们使用机器学习预测模型的工作流程讲解机器学习系统整套处理过程。 整个过程包括了数据预处理、模型学习、模型验证及模型预测。其中数据预处理包含了对数据的基本处理&#xff0c;包括特征抽取及缩放、特征选择、特征降维和特征抽样&#xff1b;我们将…

Linux的简介和环境搭建

简介 Linux是一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个基于POSIX和Unix的多用户、多任务、支持多线程和多CPU的操作系统。它能运行主要的Unix工具软件、应用程序和网络协议。它支持32位和64位硬件。Linux继承了Unix以网络为核心的设计思想&#xff0c;是一个…

Python构造代理IP池提高访问量

目录 前言 一、代理IP是什么 二、代理IP池是什么 三、如何构建代理 IP 池 1. 从网上获取代理 IP 地址 2. 对 IP 地址进行筛选 3. 使用筛选出来的 IP 地址进行数据的爬取 四、总结 前言 爬虫程序是批量获取互联网上的信息的重要工具&#xff0c;在访问目标网站时需要频…

#stm32整理(一)flash读写

以这篇未开始我将进行stm32学习整理为期一个月左右完成stm32知识学习整理内容顺序没有一定之规写到哪想到哪想到哪写到哪&#xff0c;主要是扫除自己知识上的盲区完成一些基本外设操作。 以stm32f07为例子进行flash读写操作 stm32flash简介 参考资料正点原子和野火开发手册 …

如何选择传感器输出模式——电流输出还是电压输出?

一 背景及挑战 传感器在汽车测试系统中发挥着采集和传输信息的作用&#xff0c;可称之为汽车的“神经元”。 按照功能可将传感器分为压力传感器、流量传感器、温湿度传感器和电流传感器等。传感器的主要指标是精度、测量范围和响应时间等。在满足指标的情况下&#xff0c;通常…

笔记软件Notability mac中文版软件功能

Notability mac是一款帮助用户备注文件的得力工具&#xff0c;Notability Mac版可用于注释文稿、草拟想法、录制演讲、记录备注等。它将键入、手写、录音和照片结合在一起&#xff0c;便于您根据需要创建相应的备注。 Mac Notability mac中文版软件功能 将手写&#xff0c;照片…

springboot和flask整合nacos,使用openfeign实现服务调用,使用gateway实现网关的搭建(附带jwt续约的实现)

环境准备&#xff1a; 插件版本jdk21springboot 3.0.11 springcloud 2022.0.4 springcloudalibaba 2022.0.0.0 nacos2.2.3&#xff08;稳定版&#xff09;python3.8 nacos部署&#xff08;docker&#xff09; 先创建目录&#xff0c;分别创建config&#xff0c;logs&#xf…

APISpace 全国快递物流地图轨迹查询API接口案例代码

1.全国快递物流地图轨迹查询接口详解 1.1 接口请求 请求方式&#xff1a;POST请求地址&#xff1a;https://eolink.o.apispace.com/wldtgj1/paidtobuy_api/trace_map请求头&#xff1a; 标签必填说明X-APISpace-Token是鉴权私钥&#xff0c;登陆 APISpace 后在管理后台的[访…

证照之星XE专业版下载专业证件照制作工具

值得肯定的是智能背景替换功能&#xff0c;轻松解决背景处理这一世界难题。不得不提及的是新增打印字体设置&#xff0c;包含字体选择、字号大小、字体颜色等。不同领域的应用证明了万能制作&#xff0c;系统支持自定义证照规格&#xff0c;并预设了17种常用的证件照规格。人所…

紧急:发现NGINX Ingress Controller for Kubernetes中的新安全漏洞

导语 大家好&#xff0c;今天我要向大家紧急报告一则消息&#xff1a;我们在NGINX Ingress Controller for Kubernetes中发现了三个新的安全漏洞&#xff01;这些漏洞可能被黑客利用&#xff0c;从集群中窃取机密凭据。在本文中&#xff0c;我们将详细介绍这些漏洞的细节&#…

ROCKCHIP ~ Camera 闪光灯

一、闪光灯基本原理 工作模式 Camera flash led分flash和torch两种模式。 flash&#xff1a; 拍照时上光灯瞬间亮一下&#xff0c;电流比较大&#xff0c;目前是1000mA&#xff0c;最大电流不能超过led最大承受能力 torch&#xff1a; 只用于录video或者拿led当手电筒的情况&…

python多环境并存

1. 现况简介 1.1 本人windows所存Python版本 Python 2.7 Python 3.6 Python 3.7 1.2 Python 各版本路径如下 Python 2.7Python 3.6Python 3.7C:\Server\Python27C:\Server\Python36C:\Server\Python37 1.3 系统环境变量配置如下 2. 解决方案 2.1 进入目录 cd C:\Server…

qt5工程打包成可执行exe程序

一、编译生成.exe 1.1、在release模式下编译生成.exe 1.2、建一个空白文件夹package&#xff0c;再将在release模式下生成的.exe文件复制到新建的文件夹中package。 1.3、打开QT5的命令行 1.4、用命令行进入新建文件夹package&#xff0c;使用windeployqt对生成的exe文件进行动…

84.在排序数组中查找元素的第一个和最后一个位置(力扣)

目录 问题描述 代码解决以及思想 知识点 问题描述 代码解决以及思想 class Solution { public:vector<int> searchRange(vector<int>& nums, int target) {int left 0; // 定义左边界int right nums.size() - 1; // 定义右…

2023年10月13日,美国材料与试验协会(ASTM)发布了新版玩具安全标准ASTM F963-23

新标准发布 2023年10月13日&#xff0c;美国材料与试验协会&#xff08;ASTM&#xff09;发布了新版玩具安全标准ASTM F963-23。 主要更新内容 与ASTM F963-17相比&#xff0c;此次更新包括&#xff1a;单独描述了基材重金属元素的豁免情况&#xff0c;更新了邻苯二甲酸酯的管…

【ROS入门】雷达、摄像头及kinect信息仿真以及显示

文章结构 雷达信息仿真以及显示Gazebo仿真雷达配置雷达传感器信息xacro文件集成启动仿真环境 Rviz显示雷达数据 摄像头信息仿真以及显示Gazebo仿真摄像头新建xacro文件&#xff0c;配置摄像头传感器信息xacro文件集成启动仿真环境 Rviz显示摄像头数据 kinect信息仿真以及显示Ga…