Flink SQL自定义标量函数(Scalar Function)

使用场景: 标量函数即 UDF,⽤于进⼀条数据出⼀条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.ScalarFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中

开发案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;/*** 输入数据: * nc -lk 88888* a,1** 输出结果:* res1=>:3> +I[97]* res2=>:3> +I[97]* res3=>:3> +I[97]*/
public class ScalarFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, String>> tpStream = source.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0], input.split(",")[1]);}});Table table = tEnv.fromDataStream(tpStream, "id,name");tEnv.createTemporaryView("SourceTable",table);// 在 Table API ⾥不经注册直接调⽤函数Table res1 = tEnv.from("SourceTable").select(call(HashFunction.class, $("id")));// 注册函数tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);// 在 Table API ⾥调⽤注册好的函数Table res2 = tEnv.from("SourceTable").select(call("HashFunction", $("id")));// 在 SQL ⾥调⽤注册好的函数Table res3 = tEnv.sqlQuery("SELECT HashFunction(id) FROM SourceTable");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");env.execute();}public static class HashFunction extends ScalarFunction {// 接受任意类型输⼊,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}}
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/189870.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

冒泡排序

贵阳这个地方的天气变化好大呀&#xff0c;前两天晒大太阳&#xff0c;今天就冷的脚抖&#xff0c;简直不要太冷&#xff0c;但是不管怎么样&#xff0c;还是要学习的哟&#xff01; 冬天来了&#xff0c;春天确实还有一点远&#xff01; 好了&#xff0c;话不多说&#xff0c;…

springboot rocketmq 延时消息、延迟消息

rocketmq也有延迟消息&#xff0c;经典的应用场景&#xff1a;订单30分钟未支付&#xff0c;则取消的场景 其他博客提到从rocketmq5.0开始&#xff0c;支持自定义延迟时间&#xff0c;4.x只支持预定义延迟时间&#xff0c;安装rocketmq可参考RocketMq简介及安装、docker安装ro…

【C++】模板初阶

目录 一&#xff0c;泛型编程 二&#xff0c;函数模板 1&#xff0c;函数模板概念 2&#xff0c;函数模板格式 3&#xff0c;函数模板的原理 4&#xff0c;函数模板的实例化 5&#xff0c;模板参数的匹配原则 三&#xff0c;类模板 1&#xff0c;类模板的定义格式 2&…

⑤ 【MySQL】DCL语句 —— 用户管理、权限控制

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ MySQL用户与权限 ⑤ 【MySQL】DCL语句 —— 用…

pytorch中对nn.BatchNorm2d()函数的理解

pytorch中对BatchNorm2d函数的理解 简介计算3. Pytorch的nn.BatchNorm2d()函数4 代码示例 简介 机器学习中&#xff0c;进行模型训练之前&#xff0c;需对数据做归一化处理&#xff0c;使其分布一致。在深度神经网络训练过程中&#xff0c;通常一次训练是一个batch&#xff0c…

U-Mail邮件系统安全登录解决方案

企业邮箱是企业对内对外商务往来的主要通信工具&#xff0c;并且企业邮箱里面还包含了大量企业内部隐私信息、商业机密等&#xff0c;很容易成为黑客的攻击目标。其中邮件盗号是企业邮箱遭受攻击的主要形式&#xff0c;一旦企业邮箱密码被黑客盗取&#xff0c;黑客不仅可以利用…

操作系统 | 虚拟机及linux的安装

​ &#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《操作系统实验室》&#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 目录结构 1.操作系统实验之虚拟机及linux的安装 1.1 实验目的 1.2 实验内容 1.3 实验步骤 …

BIO、NIO、AIO之间有什么区别

文章目录 BIO优缺点示例代码 NIO优缺点示例代码 AIO优缺点示例代码 总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 BIO、NIO和AIO是Java编程语言中用于处理输入输出&#xff08;IO…

MYSQL字符串函数详解和实战(字符串函数大全,内含示例)

MySQL提供了许多字符串函数&#xff0c;用于处理和操作字符串数据。以下是一些常用的MYSQL字符串函数。 建议收藏以备后续用到查阅参考。 目录 一、CONCAT 拼接字符串 二、CONCAT_WS 拼接字符串 三、SUBSTR 取子字符串 四、SUBSTRING 取子字符串 五、SUBSTRING_INDEX 取子…

《红蓝攻防对抗实战》十二.内网穿透之利用ICMP协议进行隧道穿透

内网穿透之利用ICMP协议进行隧道穿透 一.前言二.前文推荐三.利用ICMP协议进行隧道穿透1.ICMPsh获取反弹shell2.PingTunnel 搭建隧道 四.本篇总结 一.前言 本文介绍了利用ICMP协议进行隧道穿透的方法。ICMP协议不需要开放端口&#xff0c;可以将TCP/UDP数据封装到ICMP的Ping数据…

Mysql数据库 14.SQL语言 视图

一、视图的概念 视图&#xff1a;就是由数据库中一张或多张表根据特定的条件查询出的数据狗造成的虚拟表 二、视图的作用 安全性&#xff0c;简单性 三、视图的语法 语法 create view 视图表 as select_statement; 代码实现 #创建视图 将查询结果创建称为视图&#x…

Flutter开发中的一些Tips(四)

最近接手了一个flutter项目&#xff0c;整体感觉代码质量不高&#xff0c;感觉有些是初学者容易犯的问题。几年前写的前三篇&#xff0c;我是站在我自己开发遇到问题的角度&#xff0c;这篇是站在别人遇到问题的角度&#xff0c;算是一种补充。下面我整理一下遇到的小问题&…

中国专利转让数据集(1985-2021年)

专利转让数据追踪和记录专利从一个实体转移到另一个实体的过程。这些数据不仅包括参与转让的申请人和受让人的身份信息&#xff0c;如名字和地址&#xff0c;还涵盖了转让的具体法律细节&#xff0c;包括转让执行日、转让次数、法律状态变更&#xff0c;以及转让登记的相关信息…

在Spring Boot中使用MyBatis访问数据库

MyBatis&#xff0c;这个对各位使用Java开发的开发者来说还是蛮重要的&#xff0c;我相信诸位在企业开发项目的时候&#xff0c;大多数采用的是Mybatis。使用MyBatis帮助我们解决各种问题&#xff0c;实际上这篇文章&#xff0c;基本上默认为可以跳过的一篇&#xff0c;但是为了…

Linux服务器从零开始训练 RT-DETR 改进项目 (Ultralytics) 教程,改进RTDETR算法(包括使用训练、验证、推理教程)

手把手从零开始训练 RT-DETR 改进项目 (Ultralytics版本) 教程,改进RTDETR算法 本文以Linux服务器为例:从零开始使用Linux训练 RT-DETR 算法项目 《芒果剑指 RT-DETR 目标检测算法 改进》 适用于芒果专栏改进RT-DETR算法 文章目录 百度 RT-DETR 算法介绍改进网络代码汇总第…

arcgis基础篇--实验

一、绘制带空洞的面要素 方法一&#xff1a;先绘制出一个面区域&#xff0c;然后在面上再绘制一个面区域代表面洞&#xff0c;两者位于同一个图层内&#xff0c;选中代表面洞的区域&#xff0c;选择【编辑器】-【裁剪】工具&#xff0c;将面裁剪出一个洞&#xff0c;随后删除代…

openinstall携手途虎养车,赋能汽车服务数字化

近日&#xff0c;openinstall与中国领先的一站式汽车服务平台途虎养车再次续约&#xff0c;双方将开启第三年合作。过去两年&#xff0c;途虎在建设线上线下一体化数字平台的过程中&#xff0c;深度结合openinstall传参归因与渠道统计技术&#xff0c;打造出了一套高效的渠道来…

GZ038 物联网应用开发赛题第4套

2023年全国职业院校技能大赛 高职组 物联网应用开发 任 务 书 &#xff08;第4套卷&#xff09; 工位号&#xff1a;______________ 第一部分 竞赛须知 一、竞赛要求 1、正确使用工具&#xff0c;操作安全规范&#xff1b; 2、竞赛过程中如有异议&#xff0c;可向现场考评…

Hadoop学习总结(使用Java API操作HDFS)

使用Java API操作HDFS&#xff0c;是在安装和配置Maven、IDEA中配置Maven成功情况下进行的&#xff0c;如果Maven安装和配置不完全将不能进行Java API操作HDFS。 由于Hadoop是使用Java语言编写的&#xff0c;因此可以使用Java API操作Hadoop文件系统。使用HDFS提供的Java API构…

C语言进阶

数组 在基础篇说过&#xff0c;数组实际上是构造类型之一&#xff0c;是连续存放的。 一维数组 定义 定义格式&#xff1a;[存储类型] 数据类型 数组名标识符[下标]; 下面分模块来介绍一下数组的定义部分的内容。 1、初始化和元素引用&#xff1a; 可以看到数组是连续存储…