Flink-cdc Schema Evolution 详解
github原文
glimpse
flink-cdc-3 glimpse
源码基于
~/project/flink_src/flink-cdc master !4 ❯ git remote -v
origin https://github.com/apache/flink-cdc.git (fetch)
origin https://github.com/apache/flink-cdc.git (push)
~/project/flink_src/flink-cdc master !4 ❯ git rev-parse HEAD
a5b666a3254b87b44b9a3843a4d001793e86552c
<revision>3.3-SNAPSHOT</revision>
flink-cdc 3.0 重要特性
- 通过yaml文件定义pipeline
- 能够感知schema变更
pipeline demo
我们使用一个特殊的sink类型“values”来观察各种事件的产生
values是专门为调试编写的一个sink,会将产生的事件打印在stdout
需要引入包flink-cdc-pipeline-connector-values-3.3-SNAPSHOT.jar
,可以从flink-cdc工程中编译flink-cdc-pipeline-connector-values
得到
pipeline yaml
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: ${ip}port: ${port}username: ${username}password: ${password}tables: ${database}.${table}server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2
注意:parallelism > 1 时候一定要在flink中开启checkpoint
- 这是flink-cdc的已知bug,尚未解决 bug链接
- 如果想要使用flink-cdc源码调试,需要开启 --use-mini-cluster true
- 并且修改FlinkPipelineComposer的ofMiniCluster方法,手动设置enableCheckpointing
提交flink-cdc任务
./bin/flink-cdc.sh mysql-to-values.yaml
Pipeline has been submitted to cluster.
Job ID: a03966de35dc3141c890250daeac9699
Job Description: Sync Mysql Database to Values
在mysql中执行变更操作,观察flink taskmanager日志
mysql> insert into t1 values(13, 'm');
Query OK, 1 row affected (0.01 sec)mysql> alter table t1 add column c0 varchar(255);
Query OK, 0 rows affected (0.03 sec)
Records: 0 Duplicates: 0 Warnings: 0
flink日志
日志解析
注意看左侧的 “>”
由于yaml中设置的并发度是2,所有可以看到日志中有两个任务在打印
注意CreateTableEvent和AddColumnEvent这样的关于schema改变的事件会出现在两个并发中,而一个DataChangeEvent事件只会出现在单独一个并发中
flink-cdc 官方文档中描述: schema相关event与DataChangeEvent之间有如下的顺序保证
a CreateTableEvent must be emitted before any DataChangeEvent if a table is new to the framework,
and SchemaChangeEvent must be emitted before any DataChangeEvent if the schema of a table is changed.
This requirement makes sure that the framework has been aware of the schema before processing any data changes.
见understand-flink-cdc-api
schema evolution 实现原理
整体视角
SchemaRegistry运行在JobManager中,继承Coordinator与SchemaOperator交互,负责协调不同流水线中收到schema变更event后的同步
从yaml到pipeline的转化
- 入口
flink-cdc.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" org.apache.flink.cdc.cli.CliFrontend "$@"
- 入口类
CliFrontend
在CliFrontend.java
main 调用createExecutor 调用new CliExecutor 其中 pipelineDefPath 是yaml文件的路径
CliExecutor.java
1. 通过 YamlPipelineDefinitionParser 将 pipelineDefPath parse为pipelineDef
2. PipelineComposer 通过pipelineDef的定义调用flink的api构建流水线
FlinkPipelineComposer.java
// Build Source OperatorDataSourceTranslator sourceTranslator = new DataSourceTranslator();DataStream<Event> stream =sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);...// Schema operatorSchemaOperatorTranslator schemaOperatorTranslator =new SchemaOperatorTranslator(schemaChangeBehavior,pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));OperatorIDGenerator schemaOperatorIDGenerator =new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());...// Build DataSink in advance as schema operator requires MetadataApplierDataSinkTranslator sinkTranslator = new DataSinkTranslator();DataSink dataSink =sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);stream =schemaOperatorTranslator.translate(stream,parallelism,dataSink.getMetadataApplier().setAcceptedSchemaEvolutionTypes(pipelineDef.getSink().getIncludedSchemaEvolutionTypes()),pipelineDef.getRoute());
这里可以看到从yaml的描述到stream的转化
stream
关联-> 当前 env
关联-> FlinkPipelineExecution
最终通过FlinkPipelineExecution.execute()调用用到env.executeAsync()
这里处理用户描述的source和sink节点,flink-cdc还自动插入了一个SchemaOperator节点
schema event的流动
SchemaOperator与sink绑定,这里绑定关系到之后的几个操作
- 定义一个sink的时候要提供MetadataApplier,运行在JobManager(上方),通过Rpc与SchemaOperator交互
schemaOperatorTranslator.translate(...dataSink.getMetadataApplier()...);
- 所有的event都要经过SchemaOperator,SchemaOperator对于SchemaChangeEvent特殊处理
SchemaOperator.java
public void processElement(StreamRecord<Event> streamRecord)throws InterruptedException, TimeoutException, ExecutionException {Event event = streamRecord.getValue();if (event instanceof SchemaChangeEvent) {processSchemaChangeEvents((SchemaChangeEvent) event);} else if (event instanceof DataChangeEvent) {...}
最终调用到handleSchemaChangeEvent
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)throws InterruptedException, TimeoutException {...// The request will block if another schema change event is being handledSchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);if (response.isAccepted()) {LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);output.collect(new StreamRecord<>(new FlushEvent(tableId)));...// The request will block until flushing finished in each sink writerSchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();}
回想一下刚才在mysql中alter table add column的场景,每一个并发度都有一个AddColumnEvent,都会去调用
requestSchemaChange,向Coordinator发送SchemaChangeRequest
private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent)throws InterruptedException, TimeoutException {...while (true) {SchemaChangeResponse response =sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));...}}
在SchemaRegistry.java
响应请求
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {...if (request instanceof SchemaChangeRequest) {SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;requestHandler.handleSchemaChangeRequest(schemaChangeRequest, responseFuture);} else if (request instanceof SchemaChangeResultRequest) {requestHandler.getSchemaChangeResult(responseFuture);}...}
这时两个请求只有一个会被处理,另外一个会被认为是duplicate
处理的步骤如下
-
发起schema变更请求requestSchemaChange
-
如果被Coordinator Accept,执行
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
-
flushEvent在
PrePartitionOperator.java
被广播给下游所有的sink``` public void processElement(StreamRecord<Event> element) throws Exception {...if (event instanceof FlushEvent) {// Broadcast FlushEventbroadcastEvent(event);}... } ```
-
flushEvent在sink中会触发当前sink flush所有缓存的事件,之后通知Coordinator完成
DataSinkFunctionOperator.java
``` private void handleFlushEvent(FlushEvent event) throws Exception {userFunction.finish();schemaEvolutionClient.notifyFlushSuccess(getRuntimeContext().getIndexOfThisSubtask(), event.getTableId()); } ```
-
-
hang在requestSchemaChangeResult,等待MetadataApplier变更下游数据库schema(比如Doris),天然hang住了上游消息
-
如果不是第一个requestSchemaChange(相同请求已经在被处理),会hang在requestSchemaChange,也天然hang住上游消息,在Coordinator(SchemaRegistry/MetaAppier)处理好之后会走duplicate分支,只打印日志
"{}> Schema change event {} has been handled in another subTask already."
-
下游sink在处理完flush之后会触发notifyFlushSuccess,
SchemaRegistry.java
SchemaRegistry会调用handleEventFromOperator
响应,最终调用到SchemaRegistryRequestHandler.java
中的applySchemaChange
, 调用对应sink的metadataApplier
metadataApplier.applySchemaChange(changeEvent);
-
上面步骤完成之后第一个hang住的requestSchemaChange会返回
MetadataApplier中干了什么
拿Doris举例, 直接去修改后端的列了,这时修改是安全的,因为上游的mysql修改schema之后产生的消息都被hang住,修改schema之前的消息都已经被各个sink flush消费完
DorisMetadataApplier.java
public void applySchemaChange(SchemaChangeEvent event) {SchemaChangeEventVisitor.<Void, SchemaEvolveException>visit(event,addColumnEvent -> {applyAddColumnEvent(addColumnEvent);return null;},alterColumnTypeEvent -> {applyAlterColumnTypeEvent(alterColumnTypeEvent);return null;},createTableEvent -> {applyCreateTableEvent(createTableEvent);return null;},dropColumnEvent -> {applyDropColumnEvent(dropColumnEvent);return null;},...
glimpse 中没有说清楚的点
- schema变更消息会在每个并发度的源头都会产生吗?回答:是的,只有这样SchemaOperator才有机会正确的hang住所有的并发度,并等待SchemaRegistry(MetadataApplier)的响应
总结
flink-cdc 3.0 通过加入了SchemaOperator和MetadataApplier,监控链路上所有消息,当发生schema变更时,同步上下游
- hang住上游
- flush下游
- 修改下游schema
- 恢复运行
这样实现了自动schema变更