Flink-cdc Schema Evolution 详解

Flink-cdc Schema Evolution 详解

github原文

glimpse

flink-cdc-3 glimpse

源码基于

~/project/flink_src/flink-cdc master !4 ❯ git remote -v
origin  https://github.com/apache/flink-cdc.git (fetch)
origin  https://github.com/apache/flink-cdc.git (push)
~/project/flink_src/flink-cdc master !4 ❯ git rev-parse HEAD
a5b666a3254b87b44b9a3843a4d001793e86552c
<revision>3.3-SNAPSHOT</revision>

flink-cdc 3.0 重要特性

  • 通过yaml文件定义pipeline
  • 能够感知schema变更

pipeline demo

我们使用一个特殊的sink类型“values”来观察各种事件的产生

values是专门为调试编写的一个sink,会将产生的事件打印在stdout

需要引入包flink-cdc-pipeline-connector-values-3.3-SNAPSHOT.jar,可以从flink-cdc工程中编译flink-cdc-pipeline-connector-values得到
在这里插入图片描述

pipeline yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: ${ip}port: ${port}username: ${username}password: ${password}tables: ${database}.${table}server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2

注意:parallelism > 1 时候一定要在flink中开启checkpoint

  1. 这是flink-cdc的已知bug,尚未解决 bug链接
  2. 如果想要使用flink-cdc源码调试,需要开启 --use-mini-cluster true
    请添加图片描述
  3. 并且修改FlinkPipelineComposer的ofMiniCluster方法,手动设置enableCheckpointing
    请添加图片描述

提交flink-cdc任务

./bin/flink-cdc.sh mysql-to-values.yaml
Pipeline has been submitted to cluster.
Job ID: a03966de35dc3141c890250daeac9699
Job Description: Sync Mysql Database to Values

在mysql中执行变更操作,观察flink taskmanager日志

mysql> insert into t1 values(13, 'm');
Query OK, 1 row affected (0.01 sec)mysql> alter table t1 add column c0 varchar(255);
Query OK, 0 rows affected (0.03 sec)
Records: 0  Duplicates: 0  Warnings: 0

flink日志

在这里插入图片描述

日志解析

注意看左侧的 “>”

由于yaml中设置的并发度是2,所有可以看到日志中有两个任务在打印

注意CreateTableEvent和AddColumnEvent这样的关于schema改变的事件会出现在两个并发中,而一个DataChangeEvent事件只会出现在单独一个并发中
在这里插入图片描述

flink-cdc 官方文档中描述: schema相关event与DataChangeEvent之间有如下的顺序保证

a CreateTableEvent must be emitted before any DataChangeEvent if a table is new to the framework, 
and SchemaChangeEvent must be emitted before any DataChangeEvent if the schema of a table is changed. 
This requirement makes sure that the framework has been aware of the schema before processing any data changes.

见understand-flink-cdc-api
在这里插入图片描述

schema evolution 实现原理

整体视角

在这里插入图片描述

SchemaRegistry运行在JobManager中,继承Coordinator与SchemaOperator交互,负责协调不同流水线中收到schema变更event后的同步

从yaml到pipeline的转化

  • 入口flink-cdc.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" org.apache.flink.cdc.cli.CliFrontend "$@"
  • 入口类 CliFrontendCliFrontend.java
main 调用createExecutor 调用new CliExecutor 其中 pipelineDefPath 是yaml文件的路径
  • CliExecutor.java
1. 通过 YamlPipelineDefinitionParser 将 pipelineDefPath parse为pipelineDef
2. PipelineComposer 通过pipelineDef的定义调用flink的api构建流水线
  • FlinkPipelineComposer.java
// Build Source OperatorDataSourceTranslator sourceTranslator = new DataSourceTranslator();DataStream<Event> stream =sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);...// Schema operatorSchemaOperatorTranslator schemaOperatorTranslator =new SchemaOperatorTranslator(schemaChangeBehavior,pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));OperatorIDGenerator schemaOperatorIDGenerator =new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());...// Build DataSink in advance as schema operator requires MetadataApplierDataSinkTranslator sinkTranslator = new DataSinkTranslator();DataSink dataSink =sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);stream =schemaOperatorTranslator.translate(stream,parallelism,dataSink.getMetadataApplier().setAcceptedSchemaEvolutionTypes(pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),pipelineDef.getRoute());

这里可以看到从yaml的描述到stream的转化

stream 关联-> 当前 env 关联-> FlinkPipelineExecution
最终通过FlinkPipelineExecution.execute()调用用到env.executeAsync()

这里处理用户描述的source和sink节点,flink-cdc还自动插入了一个SchemaOperator节点
在这里插入图片描述

schema event的流动

SchemaOperator与sink绑定,这里绑定关系到之后的几个操作

  1. 定义一个sink的时候要提供MetadataApplier,运行在JobManager(上方),通过Rpc与SchemaOperator交互
        schemaOperatorTranslator.translate(...dataSink.getMetadataApplier()...);
  1. 所有的event都要经过SchemaOperator,SchemaOperator对于SchemaChangeEvent特殊处理 SchemaOperator.java
public void processElement(StreamRecord<Event> streamRecord)throws InterruptedException, TimeoutException, ExecutionException {Event event = streamRecord.getValue();if (event instanceof SchemaChangeEvent) {processSchemaChangeEvents((SchemaChangeEvent) event);} else if (event instanceof DataChangeEvent) {...}

最终调用到handleSchemaChangeEvent

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)throws InterruptedException, TimeoutException {...// The request will block if another schema change event is being handledSchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);if (response.isAccepted()) {LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);output.collect(new StreamRecord<>(new FlushEvent(tableId)));...// The request will block until flushing finished in each sink writerSchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();}

回想一下刚才在mysql中alter table add column的场景,每一个并发度都有一个AddColumnEvent,都会去调用
requestSchemaChange,向Coordinator发送SchemaChangeRequest

private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent)throws InterruptedException, TimeoutException {...while (true) {SchemaChangeResponse response =sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));...}}

SchemaRegistry.java响应请求

 public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {...if (request instanceof SchemaChangeRequest) {SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;requestHandler.handleSchemaChangeRequest(schemaChangeRequest, responseFuture);} else if (request instanceof SchemaChangeResultRequest) {requestHandler.getSchemaChangeResult(responseFuture);}...}

这时两个请求只有一个会被处理,另外一个会被认为是duplicate

处理的步骤如下

  • 发起schema变更请求requestSchemaChange

  • 如果被Coordinator Accept,执行output.collect(new StreamRecord<>(new FlushEvent(tableId)));

    • flushEvent在PrePartitionOperator.java被广播给下游所有的sink

      ```
      public void processElement(StreamRecord<Event> element) throws Exception {...if (event instanceof FlushEvent) {// Broadcast FlushEventbroadcastEvent(event);}...
      }
      ```
      
    • flushEvent在sink中会触发当前sink flush所有缓存的事件,之后通知Coordinator完成DataSinkFunctionOperator.java

      ```
      private void handleFlushEvent(FlushEvent event) throws Exception {userFunction.finish();schemaEvolutionClient.notifyFlushSuccess(getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
      }
      ```
      
  • hang在requestSchemaChangeResult,等待MetadataApplier变更下游数据库schema(比如Doris),天然hang住了上游消息

  • 如果不是第一个requestSchemaChange(相同请求已经在被处理),会hang在requestSchemaChange,也天然hang住上游消息,在Coordinator(SchemaRegistry/MetaAppier)处理好之后会走duplicate分支,只打印日志"{}> Schema change event {} has been handled in another subTask already."

  • 下游sink在处理完flush之后会触发notifyFlushSuccess,SchemaRegistry.java SchemaRegistry会调用handleEventFromOperator响应,最终调用到SchemaRegistryRequestHandler.java中的applySchemaChange, 调用对应sink的metadataApplier
    metadataApplier.applySchemaChange(changeEvent);

  • 上面步骤完成之后第一个hang住的requestSchemaChange会返回

MetadataApplier中干了什么

拿Doris举例, 直接去修改后端的列了,这时修改是安全的,因为上游的mysql修改schema之后产生的消息都被hang住,修改schema之前的消息都已经被各个sink flush消费完

DorisMetadataApplier.java

 public void applySchemaChange(SchemaChangeEvent event) {SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(event,addColumnEvent -> {applyAddColumnEvent(addColumnEvent);return null;},alterColumnTypeEvent -> {applyAlterColumnTypeEvent(alterColumnTypeEvent);return null;},createTableEvent -> {applyCreateTableEvent(createTableEvent);return null;},dropColumnEvent -> {applyDropColumnEvent(dropColumnEvent);return null;},...

glimpse 中没有说清楚的点

  1. schema变更消息会在每个并发度的源头都会产生吗?回答:是的,只有这样SchemaOperator才有机会正确的hang住所有的并发度,并等待SchemaRegistry(MetadataApplier)的响应

总结

flink-cdc 3.0 通过加入了SchemaOperator和MetadataApplier,监控链路上所有消息,当发生schema变更时,同步上下游

  1. hang住上游
  2. flush下游
  3. 修改下游schema
  4. 恢复运行

这样实现了自动schema变更

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/458021.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

介绍一款Java开发的企业接口管理系统和开放平台

介绍一款Java开发的企业接口管理系统和开放平台&#xff0c;YesApi接口管理平台Java版本。该系统基于Java开发&#xff0c;可以帮助企业进行统一接口管理、API接口开放&#xff0c;以及用于PaaS系统和SaaS产品平台的快速二次开发和搭建。 一、系统概述 YesApi接口大师&#x…

ClickHouse 5节点集群安装

ClickHouse 5节点集群安装 在此架构中&#xff0c;配置了五台服务器。其中两个用于托管数据副本。其他三台服务器用于协调数据的复制。在此示例中&#xff0c;我们将创建一个数据库和表&#xff0c;将使用 ReplicatedMergeTree 表引擎在两个数据节点之间复制该数据库和表。 官…

RHCE作业二

1.要求&#xff1a; 配置nginx服务通过ip访问多网站 2. 1关闭防火墙 2创建ip 3配置 4创建文件 5测试

logback 如何将日志输出到文件

如何作 将日志输出到文件需要使用 RollingFileAppender&#xff0c;该 Appender 必须定义 rollingPolicy &#xff0c;另外 rollingPollicy 下必须定义 fileNamePattern 和 encoder <appender name"fileAppender" class"ch.qos.logback.core.rolling.Rollin…

二、Spring的执行流程

文章目录 1. spring的初始化过程1.1 ClassPathXmlApplicationContext的构造方法1.2 refresh方法&#xff08;核心流程&#xff09;1.2.1 prepareRefresh() 方法1.2.2 obtainFreshBeanFactory() 方法1.2.3 prepareBeanFactory() 方法1.2.4 invokeBeanFactoryPostProcessors() 方…

shodan2---清风

注&#xff1a;本文章源于泷羽SEC&#xff0c;如有侵权请联系我&#xff0c;违规必删 学习请认准泷羽SEC学习视频:https://space.bilibili.com/350329294 实验一&#xff1a;search 存在CVE-2019-0708的网络设备 CVE - 2019 - 0708**漏洞&#xff1a;** 该漏洞存在于远程桌面…

解读数字化转型的敏捷架构:从理论到实践的深度分析

在当今数字经济的推动下&#xff0c;企业要在瞬息万变的市场中保持竞争力&#xff0c;数字化转型已经不再是一种选择&#xff0c;而是不可避免的战略需求。然而&#xff0c;企业如何从理论到实践进行有效的转型&#xff0c;尤其是在复杂的技术环境中&#xff0c;如何通过正确的…

来源爬虫程序调研报告

来源爬虫程序调研报告 一、什么是爬虫 爬虫&#xff1a;就是抓取网页数据的程序。从网站某一个页面&#xff08;通常是首页&#xff09;开始&#xff0c;读取网页的内容&#xff0c;找到在网页中的其它链接地址&#xff0c;然后通过这些链接地址寻找下一个网页&#xff0c;这…

中小型门诊管理系统源码,云诊所管理系统源码,前端技术栈:Vue 2 , Vite , Vue Router 3

中小型门诊管理系统源码&#xff0c;云诊所管理系统源码&#xff0c; 前端技术栈&#xff1a;Vue 2 Vite Vue Router 3 Vuex 3 Element Plus Axios TypeScript Quill Election 后端技术栈&#xff1a;Spring Boot MyBatis MyBatis-Plus Spring Security Swagger2 …

使用Python计算相对强弱指数(RSI)进阶

使用Python计算相对强弱指数&#xff08;RSI&#xff09;进阶 废话不多说&#xff0c;直接上主题&#xff1a;> 代码实现 以下是实现RSI计算的完整代码&#xff1a; # 创建一个DataFramedata {DATE: date_list, # 日期CLOSE: close_px_list, # 收盘价格 }df pd.DataF…

基于丑萌气质狗--C#的sqlserver学习

#region 常用取值 查询List<string> isName new List<string> { "第一", "第二", "第三", "第四" }; List<string> result isName.Where(m > m "第三").ToList();MyDBContext myDBnew MyDBContext(…

【数据分享】中国汽车市场年鉴(2013-2023)

数据介绍 在这十年里&#xff0c;中国自主品牌汽车迅速崛起。吉利、长城、比亚迪等品牌不断推出具有竞争力的车型&#xff0c;在国内市场乃至全球市场都占据了一席之地。同时&#xff0c;新能源汽车的发展更是如日中天。随着环保意识的提高和政策的大力支持&#xff0c;电动汽车…

CSS伪元素以及伪类和CSS特性

伪元素&#xff1a;可以理解为假标签。 有2个伪元素 &#xff08;1&#xff09;::before &#xff08;2&#xff09;::after ::before <!DOCTYPE html> <html> <head><title></title><style type"text/css">body::before{con…

Android简单控件实现简易计算器

学了一些Android的简单控件&#xff0c;用这些布局和控件&#xff0c;设计并实现一个简单计算器。 计算器的界面分为两大部分&#xff0c;第一部分是上方的计算表达式&#xff0c;既包括用户的按键输入&#xff0c;也包括计算结果 数字&#xff1b;第二部分是下方的各个按键&a…

【redis】初识非关系型数据库——redis

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; 初识 Redis Redis是⼀种基于键值对&#xff08;key-value&#xff09;的NoSQL数据库&#xff0c;与很多键值对数据库不同的是&#xff0c;Redis 中的值可以是由string&#xff08;字符串&#xff09;、hash&#xff0…

基于协同过滤算法的个性化课程推荐系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

AndroidStudio部署多渠道打包环境(一)

对于游戏来说&#xff0c;需要上架国内很多家应用商店&#xff0c;还有一些小的渠道SDK&#xff0c;大大小小加起来也有几十家了&#xff0c;那么我们部署了多渠道打包环境之后就很方便了。 一 、配置游戏基本参数&#xff1a;在app下面的build.gradle文件里编辑&#xff0c; …

Java全栈经典面试题剖析4】JavaSE高级 -- 包装类,String, 类方法

目录 面试题3.1 什么是自动装箱与拆箱&#xff1f;用什么方式来装箱与拆箱&#xff1f; 面试题3.2 int和Integer有什么区别&#xff1f; 面试题3.3 Integer常量池 面试题3.4 字符串常量池 面试题3.5 这句代码创建了几个对象? String str1 new String("xyz");…

【AI大模型】深入解析 存储和展示地理数据(.kmz)文件格式:结构、应用与项目实战

文章目录 1. 引言2. 什么是 .kmz 文件&#xff1f;2.1 .kmz 文件的定义与用途2.2 .kmz 与 .kml 的关系2.3 常见的 .kmz 文件使用场景 3. .kmz 文件的内部结构3.1 .kmz 文件的压缩格式3.2 解压缩 .kmz 文件的方法3.3 .kmz 文件的典型内容3.4 .kml 文件的结构与主要元素介绍 4. 深…

python对文件的读写操作

任务:读取文件夹下的批量txt数据&#xff0c;并将其写入到对应的word文档中。 txt文件中包含&#xff1a;编号、报告内容和表格数据。写入到word当中&#xff1a;编号、报告内容、表格数据、人格雷达图以及对应的详细说明&#xff08;详细说明是根据表格中的标识那一列中的加号…