flinkSql 将流和表的互相转换

流——>表

方式一

方式二

方式一:写sql 
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));

 表——>流

Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);

 wordCount案例

方式一:使用sql

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/*** @基本功能:* @program:flinkProject* @author: 堇年* @create:2024-11-28 14:42:27**/
public class _06_flink_wordcounnt {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象tEnv.createTemporaryView("t_1",flatMap,$("word"));//3. 编写sql语句Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");//4. 将Table变为stream流//使用toAppendStream时会报错 因为有聚合操作//DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 在这里可以映射为ROW对象,也可以映射为自己定义的实体类DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}

方式二:使用dsl语句 

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;public class _06_flink_wordcounnt_dsl {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象Table table = tEnv.fromDataStream(flatMap,$("word"));//3. 编写sql语句Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));rsTable.printSchema();//4. 将Table变为stream流DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}

结果展示 

+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/486776.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI大模型驱动数据分析:利用自然语言实现数据查询与可视化(1)

在当今AI驱动的时代&#xff0c;数据分析已成为各行各业不可或缺的能力。然而&#xff0c;传统的数据分析流程通常需要掌握SQL、数据处理和可视化等多项专业技能&#xff0c;这对非技术背景的业务人员来说是一个不小的挑战。 想象一下&#xff0c;当数据中心的负责人打开手机时…

PyCharm+Selenium+Pytest配置小记

1、下载ChromeDriver&#xff1a; Chrome130以后的Driver下载&#xff1a; Chrome for Testing availabilityhttps://googlechromelabs.github.io/chrome-for-testing/ &#xff08;1&#xff09;查看自己Crome浏览器的版本&#xff1a;设置-->关于 Chrome&#xff1b; &…

【原生js案例】webApp实现鼠标移入移出相册放大缩小动画

图片相册这种动画效果也很常见&#xff0c;在我们的网站上。鼠标滑入放大图片&#xff0c;滑出就恢复原来的大小。现在我们使用运动定时器来实现这种滑动效果。 感兴趣的可以关注下我的系列课程【webApp之h5端实战】&#xff0c;里面有大量的css3动画效果制作原生知识分析&…

Qt 安装Qt Serial Port

最近要用Qt写个串口上位机软件&#xff0c;发现Qt的串口库用不了&#xff0c;上网找了一下资料&#xff0c;找到一种解决办法&#xff0c;具体操作如下&#xff1a; 参考文章&#xff1a;https 目录 一、找到QT安装路径&#xff0c;并运行Qt Maintenance Tool二、选择 添加或移…

语音识别flask接口开发

要开发一个flask语音识别接口&#xff0c;首先要解决语音文件在网络中的传输问题&#xff0c;然后选识别算法进行识别 文章目录 1、以二进制文件流方式上次语音2、网页端长连接流式上传语音文件3、语音识别接口 1、以二进制文件流方式上次语音 python服务端代码&#xff0c;以…

计算机毕业设计Python医疗问答系统 医疗可视化 BERT+LSTM+CRF深度学习识别模型 机器学习 深度学习 爬虫 知识图谱 人工智能 大数据毕业设计

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

shell条件测试

一.命令执行结果判定 && 在命令执行后如果没有任何报错时会执行符号后面的动作 || 在命令执行后如果命令有报错会执行符号后的动作 示例&#xff1a; [rootqingdeng shell3]# sh sl.sh /mnt/file is not exist no二.条件判断方法 在 shell 程序中&#xff0c;用户可…

Couchbase Lite for Android 开源项目 FAQ

Couchbase Lite for Android 开源项目 FAQ couchbase-lite-android couchbase/couchbase-lite-android: Couchbase Lite for Android 是一个轻量级的嵌入式NoSQL数据库引擎&#xff0c;可以在Android设备上离线存储和处理数据&#xff0c;并支持与Couchbase Server进行同步&…

DVWA 靶场 SQL 注入报错 Illegal mix of collations for operation ‘UNION‘ 的解决方案

在 dvwa 靶场进行联合 SQL 注入时&#xff0c;遇到报错 Illegal mix of collations for operation UNION报错如下图&#xff1a; 解决办法&#xff1a; 找到文件MySQL.php 大致位置在dvwaincludesDBMS 目录下 使用编辑器打开 检索$create_db 第一个就是 在{$_DVWA[ ‘db_d…

使用伪装IP地址和MAC地址进行Nmap扫描

使用伪装IP地址和MAC地址进行Nmap扫描 在某些网络设置中&#xff0c;攻击者可以使用伪装的IP地址甚至伪装的MAC地址进行系统扫描。这种扫描方式只有在可以保证捕获响应的情况下才有意义。如果从某个随机的网络尝试使用伪装的IP地址进行扫描&#xff0c;很可能无法接收到任何响…

PT8M2102 触控型 8Bit MCU

1 产品概述 ● PT8M2102 是一款基于 RISC 内核的8位 MTP 单片机&#xff0c;内部集成了电容式触摸感应模块、TIMER&#xff0c;PWM、LVR、LVD、WDT等外设&#xff0c;其主要用作触摸按键开关&#xff0c;广泛适用于触控调光、电子玩具、消费电子、家用电器等领域&#xff0c;具…

ARM A32多数据处理汇编指令理解分享

ARM A32多数据处理汇编指令理解分享 1 多数据存储指令1.1 push指令1.2 STMFD/STMDB指令1.3 STMED/STMDA指令1.4 STMFA/STMIB指令1.5 STMEA/STMIA指令 2 多数据加载指令2.1 pop指令2.2 LDMFD/LDMIA指令2.3 LDMFA/LDMDA指令2.4 LDMEA/LDMDB指令2.5 LDMED/LDMIB指令 在ARM A32多数…

Docker 安装 中文版 GitLab

Docker 安装系列 安装GitLab、解决服务器内存不足问题、使用域名/IP地址访问项目 1、拉取 [rootTseng ~]# docker pull twang2218/gitlab-ce-zh:latest latest: Pulling from twang2218/gitlab-ce-zh 8ee29e426c26: Pull complete 6e83b260b73b: Pull complete e26b65fd11…

二分查找(带图详解)

优选算法系列 文章目录 优选算法系列前言一、二分查找的思想二、算法使用小总结 三、代码实现四、二分查找拓展4.1、查找第一次出现的target小总结 4.2、target最后出现的位置小总结 五、代码总结 前言 在这篇博客中&#xff0c;我会给大家分享二分查找及其扩展。 这是链接-&…

arguments和纯函数的介绍

认识arguments arguments 是一个 对应于 传递给函数的参数 的类数组(array-like)对象. array-like意味着它不是一个数组类型,而是一个对象类型: □但是它却拥有数组的一些特性,比如说length,比如可以通过index索引来访问;□但是它却没有数组的一些方法,比如forEach、map等; …

煤矿 35kV 变电站 3 套巡检机器人 “上岗”,力破供电瓶颈

近日&#xff0c;杭州旗晟智能科技与甘肃某变电站配电室的三套智能巡检机器人线下测试顺利完成&#xff0c;并成功交付使用&#xff0c;这为电力运维工作注入了全新的活力与强大的技术支撑。 一、项目背景 甘肃某变电站总建筑面积1098平方米的变电站集变电、配电、监控等多功能…

隐式神经网络实现低光照图像增强

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

【人工智能基础06】人工神经网络(练习题):神经网络的计算、激活函数的选择与神经网络的退化

文章目录 1. 基于神经网络计算心理健康程度2. 添加激活函数的神经网络计算3. 使用神经网络预测小胖是否会变胖4. 激活函数选择的讨论5. 神经网络的设计6. 深度线性模型的表达能力线性模型7. 神经网络退化 主要讨论的内容 什么是人工神经网络&#xff0c;相关计算反向传播算法的…

记录Windows中Mysql安装

www.mysql.com 我用的是mysql-installer-community-8.0.25.0.msi 原先安装过,先听了了mysql服务 ctrlshiftesc 先停了服务 控制面板关于mysql的全卸载 不修改的话,默认 如果不修改 自己电脑C盘不想放太多 这里Config Type有三个选项 第一个是测试开发模式 占用内存…

【Java计算机毕业设计】Springboot+vue校园外卖配送服务管理系统【源代码+数据库+LW文档+开题报告+答辩稿+部署教程+代码讲解】

源代码数据库LW文档&#xff08;1万字以上&#xff09;开题报告答辩稿 部署教程代码讲解代码时间修改教程 一、开发工具、运行环境、开发技术 开发工具 1、操作系统&#xff1a;Window操作系统 2、开发工具&#xff1a;IntelliJ IDEA或者Eclipse 3、数据库存储&#xff1a…