Flink实战之FlinkSQL键设计对于数据保序的必要性

        乱序数据处理对于实时ETL至关重要,处理不好将会导致数据不一致场景发生。对于数据乱序场景,一般工程师已知上游数据乱序会对本身消费数据产生影响,但不一定晓得的是,一个SQL本身也可能造成数据乱序,严格意义上的数据乱序是无法避免的。本文讨论的是在SQL开发过程中,由于考虑不当导致数据乱序的场景。

        先来了解下示例数据,我们以交易场景为例,这里有三张表,分别是提交订单表提交订单明细表商品维表,如下:

-- 提单表:一个订单一行 
create table ods_submit_order(order_id        bigint comment '订单ID',create_time     string comment '创建时间',primary key(order_id) not enforced 
);
​
-- 提单明细表:一个订单一个商品一行 
create table ods_submit_order_product(order_id        bigint comment '订单ID',product_id      bigint comment '商品ID',submit_amt      double comment '订单金额',create_time     string comment '创建时间',primary key(order_id,product_id) not enforced 
);
​
-- 商品维表:一个商品一行
create table dim_product(product_id    bigint comment '商品ID',product_name  string comment '商品名称',create_time     string comment '创建时间',primary key (product_id) not enforced 
);

本文需要讨论的造成乱序的SQL如下:

insert into fact_ord_submit_order_dd 
select t1.order_id        as order_id,t2.product_id      as product_id,t2.submit_amt      as submit_amt,t3.product_name    as product_name
from ods_submit_order t1 
left join ods_submit_order_product t2 on t1.order_id = t2.order_id 
left join dim_product t3 on t2.product_id = t3.product_id 
;

        测试SQL对应的DAG图如下:

        这里假设source数据是保序的,source并发为1,对应一个subtask,join并发2,对应两个subtask处理join逻辑,根据DAG图分析:

  • Join-one:首先t1 left join t2通过order_id进行左外连接,这时Flink内部会根据order_id的hash值将相同order_id的数据shuffer到相同的subtask上执行。

  • Join-two:其次t2 left join t3通过product_id进行左外连接,这时Flink内部会根据product_id的hash值将相同product_id的数据shuffer到相同的subtask上执行。

        可以发现,这个SQL进行了两次shuffer过程,而且两次shuffer的key都不一样。

    Join-one可能对应的数据流如下:order_id=1数据都会shuffer到同一个subtask上

p1: +I(1, null, null)
p1: -D(1, null, null)
p1: +I(1, 1, 10.0)

    Join-two可能对应的数据流如下:product_id=1和为null的数据被shuffer到了不同的subtask上进行处理

p1: +I(1, null, null, null)
p1: -D(1, null, null, null)
p2: +I(1, 1, 10.0, null)
p2: -D(1, 1, 10.0, null)
p2: +I(1, 1, 10.0, java)

        从Join-two的输出结果可以发现,当前输出由两个subtask任务处理,数据最终结果为+I(1, 1, 10.0, java),所以这时候数据发往下游表fact_ord_submit_order_dd,由于不同并发task处理能力不一样,发送数据顺序可能不一样,将会导致结果也不一样。分两种情况来看(这里left join会产生变更流),如下:

  • 场景一:结果表fact_ord_submit_order_dd为order_id

    • 场景1:比如p1先发送下游、p2后发送下游;这时候如果下游设置order_id为主键,最终结果为+I(1, 1, 10.0, java),下游数据正确。即下游消费顺序:

      p1: +I(1, null, null, null)
      p1: -D(1, null, null, null)
      p2: +I(1, 1, 10.0, null)
      p2: -D(1, 1, 10.0, null)
      p2: +I(1, 1, 10.0, java)
    • 场景2:如果p2先发送下游,然后p1发送下游;由于接收顺序与场景1不一致,最终接收结果为-D(1, null, null, null),数据出现异常,归根到底是由于left join导致数据乱序原因。即下游消费顺序:

      p2: +I(1, 1, 10.0, null)
      p2: -D(1, 1, 10.0, null)
      p2: +I(1, 1, 10.0, java)
      p1: +I(1, null, null, null)
      p1: -D(1, null, null, null)
  • 场景二:结果表fact_ord_submit_order_dd为order_id,product_id

    • 场景1:比如p1先发送下游、p2后发送下游;这时候如果下游设置order_id和product_id为主键,最终结果为+I(1, 1, 10.0, java),下游数据正确。即下游消费顺序:

      p1: +I(1, null, null, null)
      p1: -D(1, null, null, null)
      p2: +I(1, 1, 10.0, null)
      p2: -D(1, 1, 10.0, null)
      p2: +I(1, 1, 10.0, java)
    • 场景2:如果p2先发送下游,然后p1发送下游;由于接收顺序与场景1不一致,但下游设置order_id和product_id为主键,最终结果为+I(1, 1, 10.0, java)任然不受-D(1, null, null, null)的影响,下游数据正确。即下游消费顺序:

      p2: +I(1, 1, 10.0, null)
      p2: -D(1, 1, 10.0, null)
      p2: +I(1, 1, 10.0, java)
      p1: +I(1, null, null, null)
      p1: -D(1, null, null, null)

        这个例子从两次left join场景出发,我们可以发现,下游sink表主键设置会影响最终的数据准确性。当然,业务上结果表的主键也应该为order_id和product_id作为联合主键,这里order_id假设为主键对于场景ETL来说并不合适,这里仅仅是为了说明问题。

        下面让我们尝试总结一下这个Regular Join场景的执行逻辑:在流式处理数据的过程中,当本侧到来一条新的数据时,我们无法预测对侧是否在之后还会到来能够和该数据关联上的数据,且考虑到时效性,我们也无法一直等待右侧所有数据到齐后再关联下发,因此 Flink 的处理方式是先将当前数据和对侧已经到来过的所有数据(如果设置了 TTL,则是对应 TTL 时间段的数据)进行关联计算,并将关联结果下发,如果是 Outer Join,则还要考虑关联不上需要下发一条对侧为 null 的数据。除此之外,我们还要讲该数据记录在状态中,以方便后续对侧数据来做镜像的关联处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/282017.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

优思学院|APQP产品质量先期策划全面指南

产品质量先期策划(APQP)是什么? 产品质量先期策划(APQP)是针对复杂产品和供应链在推出新产品时存在的众多失败可能性,目的在于确保新产品或新流程能够获得客户满意的一种结构化过程。 在复杂的产品和供应…

硬盘哨兵Hard Disk Sentinel Pro V6.20.0.0 便携版

Hard Disk Sentinel 是一款功能强大的硬盘监控和分析软件,专为 Windows 用户设计。它可以实时监测硬盘驱动器(HDD)、固态硬盘(SSD)、混合硬盘(SSHD)、NVMe SSD、RAID 数组和外部 RAID 盒子的健康…

马斯克开源Grok-1

Grok-1是由马斯克AI创企xAI发布的第一代大语言模型,它以其巨大的参数量——高达3140亿,引起了全球范围内的广泛关注。这一参数量远超其他知名模型,如OpenAI的GPT-3.5,后者仅有1750亿参数。在2024年3月17日,马斯克宣布将…

一图看懂伙伴系统 Buddy System

伙伴系统是一种在操作系统内核中使用的管理物理内存的算法。它主要应用于现代计算机系统中,特别是在Linux等类Unix系统中。 基本思想:将物理内存分割成不同大小的块,每个块包含一定数量的连续页面。这些块的大小通常是2的幂次方,…

【GPT-SOVITS-04】SOVITS 模块-鉴别模型解析

说明:该系列文章从本人知乎账号迁入,主要原因是知乎图片附件过于模糊。 知乎专栏地址: 语音生成专栏 系列文章地址: 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…

使用CUDA 为Tegra构建OpenCV

返回:OpenCV系列文章目录(持续更新中......) 上一篇:MultiArch与Ubuntu/Debian 的交叉编译 下一篇:在iOS中安装 警告: 本教程可能包含过时的信息。 使用CUDA for Tegra 的OpenCV 本文档是构建支持 CUD…

Expert Prompting-引导LLM成为杰出专家

ExpertPrompting: Instructing Large Language Models to be Distinguished Experts 如果适当设计提示,对齐的大型语言模型(LLM)的回答质量可以显著提高。在本文中,我们提出了ExpertPrompting,以激发LLM作为杰出专家回…

vivado 增量实施

增量实施 增量实现是指增量编译的实现阶段设计流程: •通过重用参考设计中的先前布局和布线,保持QoR的可预测性。 •加快地点和路线的编制时间或尝试最后一英里的计时关闭。 下图提供了增量实现设计流程图。该图还说明了增量合成流程。有关增量的更多…

SLAM 求解IPC算法

基础知识:方差,协方差,协方差矩阵 方差:描述了一组随机变量的离散程度 方差 每个样本值 与 全部样本的平均值 相差的平方和 再求平均数,记作: 例如:计算数字1-5的方差,如下 去中心化…

关系型数据库mysql(3)索引

目录 一.索引的概念 二.索引的作用 三.创建索引的原则依据 四.索引的分类 五.索引的创建 5.1 普通索引 5.1.1 直接创建索引 5.1.2 修改表方式创建 5.1.3 创建表的时候指定索引 5.2 唯一索引 5.2.1 直接创建唯一索引 5.2.2 修改表方式创建 5.2.3 创建表的时候指…

ThreaTrace复现记录

1. 环境配置 服务器环境 需要10.2的cuda版本 conda环境 包的版本: python 3.6.13 pytorch 1.9.1 torch-cluster 1.5.9 torch-scatter 2.0.9 torch-sparse 0.6.12 torch-spline-conv 1.2.1 torch-geometric 1.4.3 环境bug 这里环境搭建好以后,就可以正…

osgEarth学习笔记2-第一个Osg QT程序

原文链接 上个帖子介绍了osgEarth开发环境的安装。本帖介绍我的第一个Osg QT程序。 下载 https://github.com/openscenegraph/osgQt 解压,建立build目录。 使用Cmake-GUI Configure 根据需要选择win32或者x64,这里我使用win32. 可以看到include和lib路…

jQuery 基础

文章目录 1. jQuery 概述1.1 JavaScript 库1.2 jQuery 概念1.3 jQuery 优点 2. jQuery 基本使用2.1 下载2.2 使用步骤2.3 jQuery 的入口函数2.4 jQuery 的顶级对象 $2.5 DOM 对象和 jQuery 对象DOM 对象和 jQuery 对象相互转换方法 1. jQuery 概述 1.1 JavaScript 库 1.2 jQue…

Cesium:按行列绘制3DTiles的等分线

作者:CSDN @ _乐多_ 本文将介绍如何使用 Cesium 引擎根据模型的中心坐标,半轴信息,绘制 3DTiles 对象的外包盒等分线。 外包盒是一个定向包围盒(Oriented Bounding Box),它由一个中心点(center)和一个包含半轴(halfAxes)组成。半轴由一个3x3的矩阵表示,这个矩阵是…

【毕设级项目】基于ESP8266的家庭灯光与火情智能监测系统——文末源码及PPT

目录 系统介绍 硬件配置 硬件连接图 系统分析与总体设计 系统硬件设计 ESP8266 WIFI开发板 人体红外传感器模块 光敏电阻传感器模块 火焰传感器模块 可燃气体传感器模块 温湿度传感器模块 OLED显示屏模块 系统软件设计 温湿度检测模块 报警模块 OLED显示模块 …

DashVector - 阿里云向量检索服务

DashVector 文章目录 DashVector一、关于 DashVector二、使用 DashVector 前提准备1、创建Cluster:2、获得API-KEY3、安装最新版SDK 三、快速使用 DashVector1. 创建Client2. 创建Collection3、插入Doc4、相似性检索5、删除Doc6. 查看Collection统计信息7. 删除Coll…

hcia datacom课程学习(3):http与https、FTP

1.超文本传输协议:http与https (1)用来访问www万维网。 wwwhttp+html+URLweb (2)它们提供了一种发布和接受html界面的方法:当在网页输入URL后,从服务器获取html文件来…

供应链投毒预警 | 恶意Py组件tohoku-tus-iot-automation开展窃密木马投毒攻击

概述 上周(2024年3月6号),悬镜供应链安全情报中心在Pypi官方仓库(https://pypi.org/)中捕获1起新的Py包投毒事件,Python组件tohoku-tus-iot-automation 从3月6号开始连续发布6个不同版本恶意包&#xff0c…

【nfs报错】rpc mount export: RPC: Unable to receive; errno = No route to host

NFS错误 问题现象解决方法 写在前面 这两天搭建几台服务器,需要使用nfs服务,于是六台选其一做服务端,其余做客户端,搭建过程写在centos7离线搭建NFS共享文件,但是访问共享时出现报错:rpc mount export: RPC…

嵌入式-4种经典继电器驱动电路-单片机IO端口/三极管/达林顿管/嵌套连接

文章目录 一:继电器原理二:单片机驱动电路三:经典继电器驱动电路方案3.1 继电器驱动电路方案一:I/O端口灌电流方式的直接连接3.1.1 方案一的继电器特性要求3.1.2 方案一可能会损坏I/O口 3.2 继电器驱动电路方案二:三极…