详解flink sql, calcite logical转flink logical

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {override def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[LogicalCalc]val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)FlinkLogicalCalc.create(newInput, calc.getProgram)}
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[FlinkLogicalCalc]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)}
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)}
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleval supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {// we support AVGcase SqlKind.AVG => true// but none of the other AVG agg functionscase k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => falsecase _ => true}val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)!hasAccurateDistinctCall && supported}

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleagg.getAggCallList.map(_.getAggregation.getKind).forall {case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => falsecase _ => true}}

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])dataStreamTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)}

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {val distribution = rel.asInstanceOf[LogicalDistribution]val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)}

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val expand = rel.asInstanceOf[LogicalExpand]val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)}

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])intermediateTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)}

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {val intersect = rel.asInstanceOf[LogicalIntersect]val newInputs = intersect.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalIntersect.create(newInputs, intersect.all)}

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val join = rel.asInstanceOf[LogicalJoin]val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)}

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {val sink = rel.asInstanceOf[LogicalLegacySink]val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)FlinkLogicalLegacySink.create(newInput,sink.hints,sink.sink,sink.sinkName,sink.catalogTable,sink.staticPartitions)}

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)isTableSourceScan(scan)}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)}

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {val logicalMatch = rel.asInstanceOf[LogicalMatch]val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)new FlinkLogicalMatch(rel.getCluster,traitSet,newInput,logicalMatch.getRowType,logicalMatch.getPattern,logicalMatch.isStrictStart,logicalMatch.isStrictEnd,logicalMatch.getPatternDefinitions,logicalMatch.getMeasures,logicalMatch.getAfter,logicalMatch.getSubsets,logicalMatch.isAllRows,logicalMatch.getPartitionKeys,logicalMatch.getOrderKeys,logicalMatch.getInterval)}

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val minus = rel.asInstanceOf[LogicalMinus]val newInputs = minus.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalMinus.create(newInputs, minus.all)}

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {val rank = rel.asInstanceOf[LogicalRank]val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)FlinkLogicalRank.create(newInput,rank.partitionKey,rank.orderKey,rank.rankType,rank.rankRange,rank.rankNumberType,rank.outputRankNumber)}

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {val snapshot = rel.asInstanceOf[LogicalSnapshot]val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)snapshot.getPeriod match {case _: RexFieldAccess =>FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)case _: RexLiteral =>newInput}}

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {val union: LogicalUnion = call.rel(0)union.all}override def convert(rel: RelNode): RelNode = {val union = rel.asInstanceOf[LogicalUnion]val newInputs = union.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalUnion.create(newInputs, union.all)}

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/368623.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年Nano编辑器最新使用教程

Nano在大多数Linux发行版中找到,易于使用,其最常用的命令显示在其屏幕底部。 作为编辑配置和其他文件是Linux中的一种普遍的任务,知道如何使用该程序是否可以非常有用。Nano编辑器以及如何使用Nano编辑器在服务器上编辑文件是我们将在本指南中…

第1章 信息系统综合知识

第1章 信息系统综合知识 本章主要介绍信息系统综合知识,介绍信息、信息系统的基本概念,概述两化融合和国家信息化战略,讲解电子政务、电子商务的典型应用,描述信息化整体总体规划以及IT战略的主要内容。 1.1 信息的定义和属性 …

JAVA高级进阶11多线程

第十一天、多线程 线程安全问题 线程安全问题 多线程给我们带来了很大性能上的提升,但是也可能引发线程安全问题 线程安全问题指的是当个多线程同时操作同一个共享资源的时候,可能会出现的操作结果不符预期问题 线程同步方案 认识线程同步 线程同步 线程同步就是让多个线…

8人团队历时半年打造开源版GPT-4o,零延迟演示引爆全网!人人可免费使用!

目录 01 Moshi 02 背后技术揭秘 GPT-4o可能要等到今年秋季才会公开。 然而,由法国8人团队开发的原生多模态Moshi,已经达到了接近GPT-4o的水平,现场演示几乎没有延迟,吸引了大量AI专家的关注。 令人惊讶的是,开源版的…

汇聚荣拼多多评价好不好?

汇聚荣拼多多评价好不好?在探讨电商平台的口碑时,用户评价是衡量其服务质量和商品质量的重要指标。拼多多作为国内领先的电商平台之一,其用户评价自然成为消费者选择购物平台时的参考依据。针对“汇聚荣拼多多评价好不好?”这一问题,可以从…

【数据结构】(C语言):队列

队列: 线性的集合。先进先出(FIFO,first in first out)。两个指针:头指针(指向第一个进入且第一个出去的元素),尾指针(指向最后一个进入且最后一个出去的元素&#xff0…

下载安装MySQL

1.软件的下载 打开官网下载mysql-installer-community-8.0.37.0.msi 2.软件的安装 mysql下载完成后,找到下载文件,双击安装 3.配置环境变量 4.自带客户端登录与退出

CocoaPodsCmake

https://juejin.cn/post/7257048145233838141?searchId20240531171431E5868B41DC7B7016CCBA https://guides.cocoapods.org CocoaPods CocoaPods的作用 帮助程序员通过命令管理第三方库及更新,以达到扩展项目的目的。 CocoaPods的使用 在已有的工程目录下新增…

【test】小爱同学通过esp32控制电脑开关

文章目录 一、环境准备二、开关机原理数据传输框架 三、环境搭建1.巴法云平台设置2.米家设置3.windows网络唤醒设置4.搭建esp32开发环境并部署(1)新建项目(2)导入esp32库(3) 添加库(4&#xff0…

Oracle Database 23ai新特性:DB_DEVELOPER_ROLE角色

角色介绍 从 Oracle Database 23ai 开始,新角色“DB_DEVELOPER_ROLE”允许管理员快速分配开发人员为 Oracle 数据库设计、构建和部署应用程序所需的所有必要权限。(包括构建数据模型所需的系统权限以及监视和调试应用程序所需的对象权限)。通…

MySQL之备份与恢复(四)

备份与恢复 存储引擎和一致性 3.复制 从备库中备份最大的好处是可以不干扰主库,避免在主库上增加额外的负载。这是一个建立备库的好理由,即使不需要用它做负载均衡或高可用。如果钱是个问题,也可以把备份用的备库用于其他用户,…

【C语言】刷题笔记 Day2

【笔记】 【1】局部变量不初始化,默认放的随机值。 1 int n0; 2 scanf("%d",&n); //13.141 【2】这里虽然输入的是一个浮点数,但是只取整数部分。 【3】3.156e7 表示的是3.156*10的7次方。 【4】多组输入,保存和不保存…

半实物仿真测试系统

设备组成 test系统主要由硬件部分与软件部分组成。硬件部分由PCI机箱、PCI控制器以及各种PCI接口板卡组成。软件部分由测试设计软件模块、测试执行服务软件模块、测试执行客户端软件模块、设备资源管理软件模块等主要软件模块以及曲线数据生成、CRC插件生成与诊断、测试数据记录…

【UE5.3】笔记7 控制Pawn移动

使用A、D键控制角色左右移动 打开我们的BP_Player蓝图类,选择事件图表,添加我们的控制事件 右键,搜索A keyboard,选择A,如下图,D也是 添加扭矩力 首先我们要把我们的player上的模拟物理选项打开,这样我们…

Arduino - TM1637 4 位 7 段显示器

Arduino - TM1637 4 位 7 段显示器 Arduino-TM1637 4 位 7 段显示器 A standard 4-digit 7-segment display is needed for clock, timer and counter projects, but it usually requires 12 connections. The TM1637 module makes it easier by only requiring 4 connectio…

开始尝试从0写一个项目--后端(一)

创建文件的目录结构 利用这个界面创建 序号 名称 说明 1 SEMS maven父工程,统一管理依赖版本,聚合其他子模块 2 sems-common 子模块,存放公共类,例如:工具类、常量类、异常类等 3 sems-pojo 子模块&#x…

【Qt】之【Bug】大量出现“未定义的标识符”问题

背景 构建时出现大量错误 原因 中文注释问题 解决 方法1. 报错代码附近的中文注释全部删掉。。。 方法2. 报错的文件添加 // Chinese word comment solution #pragma execution_character_set("utf-8")

【C语言】—— 文件操作(下)

【C语言】—— 文件操作(下) 前言:五、文件的顺序读写5.1、 顺序读写函数介绍5.2、 f p u t c fputc fputc 函数5.3、 f g e t c fgetc fgetc 函数5.4、 f p u t s fputs fputs 函数5.5、 f g e t s fgets fgets 函数5.6、 f p r i n t f…

神经网络在机器学习中的应用:手写数字识别

机器学习是人工智能的一个分支,它使计算机能够从数据中学习并做出决策或预测。神经网络作为机器学习的核心算法之一,因其强大的非线性拟合能力而广泛应用于各种领域,包括图像识别、自然语言处理和游戏等。本文将介绍如何使用神经网络对MNIST数…

2024亚太杯中文赛数学建模选题建议及各题思路来啦!

大家好呀,2024年第十四届APMCM亚太地区大学生数学建模竞赛(中文赛项)开始了,来说一下初步的选题建议吧: 首先定下主基调, 本次亚太杯推荐大家选择B题目。C题目难度较高,只建议用过kaiwu的队伍…