大数据-玩转数据-Flink SQL编程

一、概念

在这里插入图片描述

1.1 Apache Flink 两种关系型 API

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。

Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

1.2 动态表(Dynamic Tables)

动态表是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
动态表是随时间变化的,可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

对动态表的一般处理过程: 流->动态表->连续查询处理->动态表->流

二、导入Flink Table API依赖

pom.xml 中添加

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.21</version>
</dependency>

三、表与DataStream的混合使用简单案例

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;//必须添加此类才能在表达式中运用$符号
import static org.apache.flink.table.api.Expressions.$;public class Table_Api_BasicUse {public static void main(String[] args) throws Exception {// 流运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//并行参数env.setParallelism(1);// 数据源DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 创建表的执行环境StreamTableEnvironment TableEnv = StreamTableEnvironment.create(env);// 创建表,将流转换成动态表. 表的字段名从pojo的属性名自动抽取Table table = TableEnv.fromDataStream(waterSensorStream);// 对动态表进行查询Table resultTable = table.where($("id").isEqual("sensor_1")).select($("id"),$("vc"));//把动态表转化为流DataStream<Row> dataStream = TableEnv.toAppendStream(resultTable,Row.class);dataStream.print();env.execute();}
}

四、表到流的转换

动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:
Append-only 流
仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。
Retract 流
retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
Upsert 流
upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。

五、通过Connector声明读入数据

前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据

5.1 File source

// 创建表
//表的元数据信息
Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new FileSystem().path("input/sensor.txt")).withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")).withSchema(schema).createTemporaryTable("sensor");
// 做成表对象, 然后对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable.groupBy($("id")).select($("id"), $("id").count().as("cnt"));
//  把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

5.2 Kafka Source

// 创建表
// 表的元数据信息
Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());
// 连接文件, 并创建一个临时表, 其实就是一个动态表
tableEnv.connect(new Kafka().version("universal").topic("sensor").startFromLatest().property("group.id", "bigdata").property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");
//对动态表进行查询
Table sensorTable = tableEnv.from("sensor");
Table resultTable = sensorTable.groupBy($("id")).select($("id"), $("id").count().as("cnt"));
//把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
resultStream.print();

六、通过Connector声明写出数据

6.1 File Sink

package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/11 21:43*/
public class Flink02_TableApi_ToFileSystem {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream);Table resultTable = sensorTable.where($("id").isEqual("sensor_1") ).select($("id"), $("ts"), $("vc"));// 创建输出表Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());tableEnv.connect(new FileSystem().path("output/sensor_id.txt")).withFormat(new Csv().fieldDelimiter('|')).withSchema(schema).createTemporaryTable("sensor");// 把数据写入到输出表中resultTable.executeInsert("sensor");}
}

6.2 Kafka Sink

package com.atguigu.flink.java.chapter_11;import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/11 21:43*/
public class Flink03_TableApi_ToKafka {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table sensorTable = tableEnv.fromDataStream(waterSensorStream);Table resultTable = sensorTable.where($("id").isEqual("sensor_1") ).select($("id"), $("ts"), $("vc"));// 创建输出表Schema schema = new Schema().field("id", DataTypes.STRING()).field("ts", DataTypes.BIGINT()).field("vc", DataTypes.INT());tableEnv.connect(new Kafka().version("universal").topic("sink_sensor").sinkPartitionerRoundRobin().property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092")).withFormat(new Json()).withSchema(schema).createTemporaryTable("sensor");// 把数据写入到输出表中resultTable.executeInsert("sensor");}
}

七、基本使用

7.1 查询未注册的表

package com.lyh.flink12;import org.apache.flink.types.Row;
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Connect_File_source {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> dataStreamSource =env.fromElements(new WaterSensor("sensor_1", 1000L, 20),new WaterSensor("sensor_1", 2000L, 30),new WaterSensor("sensor_1", 3000L, 40),new WaterSensor("sensor_1", 4000L, 50),new WaterSensor("sensor_1", 5000L, 60));// 创建动态表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用SQL查询未注册的表// 从流中得到一个表Table inputTable = tableEnv.fromDataStream(dataStreamSource);Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id = 'sensor_1'");tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}
}

7.2 查询已注册的表

package com.lyh.flink12;
import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Flink05_SQL_BaseUse_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用sql查询一个已注册的表// 1. 从流得到一个表Table inputTable = tableEnv.fromDataStream(waterSensorStream);// 2. 把注册为一个临时视图tableEnv.createTemporaryView("sensor", inputTable);// 3. 在临时视图查询数据, 并得到一个新表Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");// 4. 显示resultTable的数据tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}
}

7.3 Kafka到Kafka

使用sql从Kafka读数据, 并写入到Kafka中

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Sql_kafka_kafka {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_source_sensor',"+ "'properties.bootstrap.servers' = 'hadoop100:9029',"+ "'properties.group.id' = 'atguigu',"+ "'scan.startup.mode' = 'latest-offset',"+ "'format' = 'json'"+ ")");// 2. 注册SinkTable: sink_sensortableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_sink_sensor',"+ "'properties.bootstrap.servers' = 'hadoop100:9029',"+ "'format' = 'json'"+ ")");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/137574.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

23种设计模式汇总详解

设计原则 中文名称英文名称含义解释单一职责原则Single Responsibility Principle(SRP)任何一个软件模块都应该只对某一类行为者负责一个类只干一件事&#xff0c;实现类要单一开闭原则Open-Close Principle(OCP)软件实体&#xff08;类、模块、函数等&#xff09;应该是可以扩…

七绝 . 秋寒

题记 拜读署名“淡定人生D”近日发表在“ 今日头条 ”上的古体诗《七绝 . 凉》&#xff0c;本老朽在由衷赞叹该女子才貌双全之时&#xff0c;也对自己寄居养老的成都崇州街子古镇今日下午的秋寒突至天气&#xff0c;情怀涌动&#xff0c;思绪万千&#xff0c;亦作《七绝 . 秋寒…

图像处理之《基于语义对象轮廓自动生成的生成隐写术》论文精读

一、相关知识 首先我们需要了解传统隐写和生成式隐写的基本过程和区别。传统隐写需要选定一幅封面图像&#xff0c;然后使用某种隐写算法比如LSB、PVD、DCT等对像素进行修改将秘密嵌入到封面图像中得到含密图像&#xff0c;通过信道传输后再利用算法的逆过程提出秘密信息。而生…

IPV6真的神

ipv6 地址短缺的现实&#xff0c;万物互联的未来<全局可达性> 1、路由表更小。地址分配遵循聚类原则&#xff0c;路由表用Entry的路由表示一片子网。 2、更强的组播以及流控制。为媒体服务质量QoS。控制提供了良好的网络平台。 3、DHCPv6,自动配置地址。使得网&#xff0…

算法基础:图

图论 图论〔Graph Theory〕是数学的一个分支。它以图为研究对象。图论中的图是由若干给定的点及连接两点的线所构成的图形&#xff0c;这种图形通常用来描述某些事物之间的某种特定关系&#xff0c;用点代表事物&#xff0c;用连接两点的线表示相应两个事物间具有这种关系。 …

openGauss天津用户组招募正式启动,欢迎报名

openGauss天津用户组招募正式启动&#xff0c;欢迎报名&#xff01; openGauss用户组(openGauss User Group&#xff0c;简称oGUG)是一个让openGauss用户就技术特性、最佳实践、运营进展等方向交流的开放性本地社区。oGUG致力于构建一个开放、多元、包容的 openGauss城市用户交…

Java高级-注解

注解 1.介绍2.元注解3.注解的解析4.注解的应用场景 1.介绍 注解 Annotation 就是Java代码里的特殊标记&#xff0c;作用是让其他程序根据注解信息来决定什么是执行该程序注解&#xff1a;注解可以在类上、构造器上、方法上、成员变量上、参数上等位置 自定义注解 /*** 自定…

拼多多商品详情API接口实时数据,获取到指定商品的详细信息,例如价格、标题、图片、描述、所属类目等信息

1.获取拼多多开发者账号 在使用拼多多 API 之前&#xff0c;需要先注册账号。注册成功后可以获取到相应的key 和Secret 用于调用 API。 2.了解拼多多商品详情 API 拼多多商品详情 API 提供了多种接口可以使用&#xff0c;其中最常用的是 pdd.ddk.goods.detail 接口。此接口可…

CLIP 基础模型:从自然语言监督中学习可转移的视觉模型

一、说明 在本文中&#xff0c;我们将介绍CLIP背后的论文&#xff08;Contrastive Language-I mage Pre-Training&#xff09;。我们将提取关键概念并分解它们以使其易于理解。此外&#xff0c;还对图像和数据图表进行了注释以澄清疑问。 图片来源&#xff1a; 论文&#xff1a…

epoll的并发服务器(TCP服务器与客户端通信)

服务器&#xff1a; #include<myhead.h> #define IP "192.168.250.100" #define PORT 8888 /* typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64; } epoll_data_t;struct epoll_event {uint32_t events; …

x86架构基础汇编知识

​ 通用寄存器 EAX 32位 函数返回值 AX 低16位 AH 高八位 AL 低八位 EBX 32位 ECX 32位 循环次数&#xff0c;this指针 EDX 32位 EBP 32位 栈底寄存器 ESP 32位 栈顶寄存器 ESI 源索引寄存器 EDI 目标索引寄存器 EIP 无法直接通过汇编操作 例子 mov al&#xff0c;0xff …

Windows虚拟机访问网页证书错误问题

问题&#xff1a; 显示证书错误&#xff0c;图片加载不出来&#xff0c;看着很别扭&#xff0c;如下&#xff1a; 方法: 1.先导出可用的证书&#xff1a; 可以将自己正常环境的证书导出来&#xff08;google浏览器为例&#xff09; 浏览器右上角三个竖点——设置——隐私设…

Python计算机二级知识点整理模拟考试

1. 循环队列是队列的一种顺序存储结构&#xff0c;用队尾指针 rear 指向队列中的队尾元素&#xff0c;用排头指针 front 指向排头元素的前一个位置。因此&#xff0c;从排头指针 front 指向的后一个位置直到队尾指针 rear 指向的位置之间所有的元素均为队列中的元素。 2&…

Python灰帽编程——网页信息爬取

文章目录 网页信息爬取1. 相关模块1.1 requests 模块1.1.1 模块中的请求方法1.1.2 请求方法中的参数1.1.3 响应对象中属性 1.2 RE 模块1.2.1 匹配单个字符1.2.2 匹配一组字符1.2.3 其他元字符1.2.4 核心函数 2. 网页信息爬取2.1 获取网页HTML 源代码2.2 提取图片地址2.3 下载图…

项目开发流程

最近在工作中总是觉得项目开发周期不足,和领导以及同时沟通后,发现自己在评估开发时间的时候,评估不准确,缺少了某些环节的时间评估。例如,没有把需求反串讲,方案讨论和制定以及自测的时间评估在内。所以大致理了一下整个项目的开发周期包含的工作量,这些工作量都需要在给出人力…

多线程的学习第二篇

多线程 线程是为了解决并发编程引入的机制. 线程相比于进程来说,更轻量 ~~ 更轻量的体现: 创建线程比创建进程,开销更小销毁线程比销毁进程,开销更小调度线程比调度进程,开销更小 进程是包含线程的. 同一个进程里的若干线程之间,共享着内存资源和文件描述符表 每个线程被独…

ORB-SLAM2_RGBD_DENSE_MAP编译、问题解决、离线加载TUM数据和在线加载D435i相机数据生成稠密地图

文章目录 0 引言1 安装依赖1.1 其他库安装1.2 pcl库安装 2 编译ORB-SLAM2_RGBD_DENSE_MAP2.1 build.sh2.2 build_ros.sh 3 运行ORB-SLAM2_RGBD_DENSE_MAP3.1 build.sh编译版本3.2 build_ros.sh编译版本 0 引言 ORB-SLAM2_RGBD_DENSE_MAP是基于ORB-SLAM2框架的一种RGB-D稠密地图…

uniapp Echart X轴Y轴文字被遮挡怎么办,或未能铺满整个容器

有时候布局太小&#xff0c;使用echarts&#xff0c;x轴y轴文字容易被遮挡&#xff0c;怎么解决这个问题呢&#xff0c;或者是未能铺满整个容器。 方法1&#xff1a; 直接设置 containLabel 字段 options: { grid: { containLabel: true, },} 方法2: 间接设置&#xff0c;但是…

Kotlin协程CoroutineScope异步async取消cancel等待await的任务

Kotlin协程CoroutineScope异步async取消cancel等待await的任务 import kotlinx.coroutines.*fun main(args: Array<String>) {runBlocking {val mScope CoroutineScope(Dispatchers.IO).async {println("->")delay(999999)println("<-")"…

Jenkins结合Gitlab,实现镜像构建及推送

docker-compose jenkins的docker-compose目录为为/home/jenkins&#xff0c;这个后面写脚本的时候需要对应上 version: 3 services:docker_jenkins:restart: alwaysimage: jenkins/jenkins:ltscontainer_name: docker_jenkinsprivileged: true ports:- 8080:8080- 50000:5000…