前言
本着学习hudi-flink源码的目的,利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。
版本
- Flink 1.15.4
- Hudi 0.13.0
目标
在文章Hudi Flink SQL代码示例及本地调试中提到:我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipeline
的sink
和source
方法里,而这两个方法也是分别调用了HoodieTableFactory
的createDynamicTableSink
和createDynamicTableSource
。那么Table API的代码怎么一步一步走到createDynamicTableSink
和createDynamicTableSource
的呢?返回HoodieTableSink
之后又是怎么写数据的?因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider
的方法体里,这些问题之前都没有搞清楚,所以这次的目标就是要搞清楚:1、Table API 的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤; 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体进行后面的写Hudi逻辑的
相关类:
HoodiePipeline
(DataStream API)HoodieTableFactory
HoodieTableSink
DataStreamSinkProviderAdapter
(函数式接口)TableEnvironmentImpl
BatchPlanner
PlannerBase
FactoryUtil
BatchExecSink
CommonExecSink
DataStream API
其实上面的问题在DataStream API代码里很容易看出来,我们先看一下DataStream API写Hudi的代码,详细代码在文章:Flink Hudi DataStream API代码示例
DataStream<RowData> dataStream = env.fromElements(GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
);HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable).column("id int").column("name string").column("price double").column("ts bigint").column("dt string").pk("id").partition("dt").options(options);builder.sink(dataStream, false);
HoodiePipeline.Builder.sink
public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {TableDescriptor tableDescriptor = getTableDescriptor();return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);}
HoodiePipeline.sink
private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context).getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded))).consumeDataStream(input);}
在HoodiePipeline.sink
就可以找到答案:
1、HoodieTableFactory
.createDynamicTableSink
返回HoodieTableSink
2、HoodieTableSink
.getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
3、DataStreamSinkProviderAdapter
.consumeDataStream
调用HoodieTableSink
.getSinkRuntimeProvider
中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStream<RowData>
HoodieTableSink
.getSinkRuntimeProvider
getSinkRuntimeProvider
返回DataStreamSinkProviderAdapter
,其中Lambda 表达式dataStream -> {}
为DataStreamSinkProviderAdapter
.consumeDataStream(dataStream)
的具体实现
@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {return (DataStreamSinkProviderAdapter) dataStream -> {// setup configurationlong ckpTimeout = dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);// set up default parallelismOptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();// bulk_insert modefinal String writeOperation = this.conf.get(FlinkOptions.OPERATION);if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {return Pipelines.bulkInsert(conf, rowType, dataStream);}// Append modeif (OptionsResolver.isAppendMode(conf)) {DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());if (OptionsResolver.needsAsyncClustering(conf)) {return Pipelines.cluster(conf, rowType, pipeline);} else {return Pipelines.dummySink(pipeline);}}DataStream<Object> pipeline;// bootstrapfinal DataStream<HoodieRecord> hoodieRecordDataStream =Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);// write pipelinepipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);// compactionif (OptionsResolver.needsAsyncCompaction(conf)) {// use synchronous compaction for bounded source.if (context.isBounded()) {conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);}return Pipelines.compact(conf, pipeline);} else {return Pipelines.clean(conf, pipeline);}};}
DataStreamSinkProviderAdapter
其实是一个函数式接口,它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口,从而实现接口的实例化
public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);@Overridedefault DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {return consumeDataStream(dataStream);}
}
函数式接口和Lambda 表达式参考下面两篇文章:
https://it.sohu.com/a/682888110_100123073
https://blog.csdn.net/Speechless_/article/details/123746047
Table API
知道了 DataStream API 调用步骤后,来对比看一下 Table API 的大致调用步骤,调试代码入口。
tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName));
整体调用流程
1、tableEnv.executeSql
->TableEnvironmentImpl.executeSql
->executeInternal(Operation operation)
->executeInternal(List<ModifyOperation> operations)
->this.translate
->(PlannerBase)this.planner.translate
2.1、PlannerBase.translate
->PlannerBase.translateToRel
->getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)
->FactoryUtil.createDynamicTableSink
->HoodieTableFactory.createDynamicTableSink
2.2、PlannerBase.translate
->(BatchPlanner)translateToPlan(execGraph)
->(ExecNodeBase)node.translateToPlan
->(BatchExecSink)translateToPlanInternal
->(CommonExecSink)createSinkTransformation
->(HoodieTableSink)getSinkRuntimeProvider
->(CommonExecSink)applySinkProvider
->provider.consumeDataStream
具体代码
TableEnvironmentImpl
(TableEnvironmentImpl)executeSql
public TableResult executeSql(String statement) {List<Operation> operations = this.getParser().parse(statement);if (operations.size() != 1) {throw new TableException("Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.");} else {// 关键步骤:executeInternalreturn this.executeInternal((Operation)operations.get(0));}}
executeInternal(Operation operation)
public TableResultInternal executeInternal(Operation operation) {if (operation instanceof ModifyOperation) {// 关键步骤:executeInternalreturn this.executeInternal(Collections.singletonList((ModifyOperation)operation));} else if (operation instanceof StatementSetOperation) {return this.executeInternal(((StatementSetOperation)operation).getOperations());
executeInternal(List<ModifyOperation> operations)
public TableResultInternal executeInternal(List<ModifyOperation> operations) {// 关键步骤:translateList<Transformation<?>> transformations = this.translate(operations);List<String> sinkIdentifierNames = this.extractSinkIdentifierNames(operations);TableResultInternal result = this.executeInternal(transformations, sinkIdentifierNames);if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {try {result.await();} catch (ExecutionException | InterruptedException var6) {result.getJobClient().ifPresent(JobClient::cancel);throw new TableException("Fail to wait execution finish.", var6);}}return result;}
translate
这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()
protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {// 这里的planner为BatchPlanner,因为我们设置了batch模式EnvironmentSettings.inBatchMode()// 关键步骤:PlannerBase.translatereturn this.planner.translate(modifyOperations);}
BatchPlanner
(BatchPlanner的父类)PlannerBase.translate
override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {beforeTranslation()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]]}// 关键步骤:translateToRelval relNodes = modifyOperations.map(translateToRel)val optimizedRelNodes = optimize(relNodes)val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)// 关键步骤:translateToPlanval transformations = translateToPlan(execGraph)afterTranslation()transformations}
PlannerBase.translateToRel
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {val dataTypeFactory = catalogManager.getDataTypeFactorymodifyOperation match {case s: UnregisteredSinkModifyOperation[_] =>val input = getRelBuilder.queryOperation(s.getChild).build()val sinkSchema = s.getSink.getTableSchema// validate query schema and sink schema, and apply cast if possibleval query = validateSchemaAndApplyImplicitCast(input,catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),null,dataTypeFactory,getTypeFactory)LogicalLegacySink.create(query,s.getSink,"UnregisteredSink",ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))case collectModifyOperation: CollectModifyOperation =>val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()DynamicSinkUtils.convertCollectToRel(getRelBuilder,input,collectModifyOperation,getTableConfig,getFlinkContext.getClassLoader)case catalogSink: SinkModifyOperation =>val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()val dynamicOptions = catalogSink.getDynamicOptions// 关键步骤:getTableSinkgetTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {case (table, sink: TableSink[_]) =>// Legacy tables can't be anonymousval identifier = catalogSink.getContextResolvedTable.getIdentifier// check the logical field type and physical field type are compatibleval queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)// validate logical schema and physical schema are compatiblevalidateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)// validate TableSinkvalidateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)// validate query schema and sink schema, and apply cast if possibleval query = validateSchemaAndApplyImplicitCast(input,table.getResolvedSchema,identifier.asSummaryString,dataTypeFactory,getTypeFactory)val hints = new util.ArrayList[RelHint]if (!dynamicOptions.isEmpty) {hints.add(RelHint.builder("OPTIONS").hintOptions(dynamicOptions).build)}LogicalLegacySink.create(query,hints,sink,identifier.toString,table,catalogSink.getStaticPartitions.toMap)case (table, sink: DynamicTableSink) =>DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)} match {case Some(sinkRel) => sinkRelcase None =>throw new TableException(s"Sink '${catalogSink.getContextResolvedTable}' does not exists")}
PlannerBase.getTableSink
private def getTableSink(contextResolvedTable: ContextResolvedTable,dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {contextResolvedTable.getTable[CatalogBaseTable] match {case connectorTable: ConnectorCatalogTable[_, _] =>val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]toScala(connectorTable.getTableSink) match {case Some(sink) => Some(resolvedTable, sink)case None => None}case regularTable: CatalogTable =>val resolvedTable = contextResolvedTable.getResolvedTable[ResolvedCatalogTable]...if (!contextResolvedTable.isAnonymous &&TableFactoryUtil.isLegacyConnectorOptions(catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),tableConfig,isStreamingMode,objectIdentifier,resolvedTable.getOrigin,isTemporary)) {...} else {...// 关键步骤:FactoryUtil.createDynamicTableSinkval tableSink = FactoryUtil.createDynamicTableSink(factory,objectIdentifier,tableToFind,Collections.emptyMap(),getTableConfig,getFlinkContext.getClassLoader,isTemporary)Option(resolvedTable, tableSink)}case _ => None}
FactoryUtil.createDynamicTableSink
根据’connector’=‘hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory,接着调用
HoodieTableFactory.createDynamicTableSink
public static DynamicTableSink createDynamicTableSink(@Nullable DynamicTableSinkFactory preferredFactory,ObjectIdentifier objectIdentifier,ResolvedCatalogTable catalogTable,Map<String, String> enrichmentOptions,ReadableConfig configuration,ClassLoader classLoader,boolean isTemporary) {final DefaultDynamicTableContext context =new DefaultDynamicTableContext(objectIdentifier,catalogTable,enrichmentOptions,configuration,classLoader,isTemporary);try {// 'connector'='hudi' // org.apache.hudi.table.HoodieTableFactoryfinal DynamicTableSinkFactory factory =preferredFactory != null? preferredFactory: discoverTableFactory(DynamicTableSinkFactory.class, context);// 关键步骤:HoodieTableFactory.createDynamicTableSinkreturn factory.createDynamicTableSink(context);} catch (Throwable t) {throw new ValidationException(String.format("Unable to create a sink for writing table '%s'.\n\n"+ "Table options are:\n\n"+ "%s",objectIdentifier.asSummaryString(),catalogTable.getOptions().entrySet().stream().map(e -> stringifyOption(e.getKey(), e.getValue())).sorted().collect(Collectors.joining("\n"))),t);}}
HoodieTableFactory.createDynamicTableSink
第一个问题解决
public DynamicTableSink createDynamicTableSink(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),"Option [path] should not be empty.");setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);// 关键步骤:HoodieTableSinkreturn new HoodieTableSink(conf, schema);}
BatchExecSink
回到方法PlannerBase.translate
,它会在后面调用translateToPlan
。execGraph.getRootNodes
返回的内容为BatchExecSink
(想知道为啥是BatchExecSink
,可以看PlannerBase.translate
中调用的translateToExecNodeGraph
方法),
BatchExecSink
是BatchExecNode
的子类,所以会执行node.translateToPlan
PlannerBase.translateToPlan
override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {beforeTranslation()val planner = createDummyPlanner()val transformations = execGraph.getRootNodes.map {// BatchExecSink// 关键步骤:ExecNodeBase.translateToPlancase node: BatchExecNode[_] => node.translateToPlan(planner)case _ =>throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +"This is a bug and should not happen. Please file an issue.")}afterTranslation()transformations}
BatchExecSink
public class BatchExecSink extends CommonExecSink implements BatchExecNode<Object> {...
public abstract class CommonExecSink extends ExecNodeBase<Object>implements MultipleTransformationTranslator<Object> {...
ExecNodeBase.translateToPlan
public final Transformation<T> translateToPlan(Planner planner) {if (transformation == null) {transformation =// 关键步骤:BatchExecSink.translateToPlanInternaltranslateToPlanInternal((PlannerBase) planner,ExecNodeConfig.of(((PlannerBase) planner).getTableConfig(),persistedConfig,isCompiled));if (this instanceof SingleTransformationTranslator) {if (inputsContainSingleton()) {transformation.setParallelism(1);transformation.setMaxParallelism(1);}}}return transformation;}
BatchExecSink.translateToPlanInternal
protected Transformation<Object> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {final Transformation<RowData> inputTransform =(Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);// org.apache.hudi.table.HoodieTableSink final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext());// 关键步骤:CommonExecSink.createSinkTransformationreturn createSinkTransformation(planner.getExecEnv(), config, inputTransform, tableSink, -1, false);}
CommonExecSink.createSinkTransformation
这里的tableSink为HoodieTableSink,会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider(没有执行里面的方法体)
protected Transformation<Object> createSinkTransformation(StreamExecutionEnvironment streamExecEnv,ExecNodeConfig config,Transformation<RowData> inputTransform,// 这里的tableSink为HoodieTableSinkDynamicTableSink tableSink,int rowtimeFieldIndex,boolean upsertMaterialize) {final ResolvedSchema schema = tableSinkSpec.getContextResolvedTable().getResolvedSchema();final SinkRuntimeProvider runtimeProvider =// 关键步骤:HoodieTableSink.getSinkRuntimeProvidertableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));final RowType physicalRowType = getPhysicalRowType(schema);final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);final int inputParallelism = inputTransform.getParallelism();final boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);final boolean hasPk = primaryKeys.length > 0;...return (Transformation<Object>)// 关键步骤:CommonExecSink.applySinkProviderapplySinkProvider(sinkTransform,streamExecEnv,runtimeProvider,rowtimeFieldIndex,sinkParallelism,config);}
CommonExecSink.applySinkProvider
先通过new DataStream<>(env, sinkTransformation)生成dataStream,接着通过执行
provider.consumeDataStream
调用HoodieTableSink.getSinkRuntimeProvider
中的方法体,这里的provider为HoodieTableSink.getSinkRuntimeProvider
返回的DataStreamSinkProviderAdapter
private Transformation<?> applySinkProvider(Transformation<RowData> inputTransform,StreamExecutionEnvironment env,SinkRuntimeProvider runtimeProvider,int rowtimeFieldIndex,int sinkParallelism,ExecNodeConfig config) {TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);if (runtimeProvider instanceof DataStreamSinkProvider) {Transformation<RowData> sinkTransformation =applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism, config);// 生成dataStreamfinal DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;// 关键步骤:provider.consumeDataStreamreturn provider.consumeDataStream(createProviderContext(config), dataStream).getTransformation();} else if (runtimeProvider instanceof TransformationSinkProvider) {...
provider.consumeDataStream
(已经在上面的类DataStreamSinkProviderAdapter
提过)
它会调用
HoodieTableSink.getSinkRuntimeProvider
中的方法体(Lambda 表达式)执行后面的写hudi逻辑
第二个问题解决
default DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {return consumeDataStream(dataStream);}
总结
本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程,并没有对源码进行深入的分析(自己水平也不够)。主要目的是为了弄清楚从Table API
的入口到createDynamicTableSink
返回HoodieTableSink
的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider
的方法体以进行后面的写Hudi逻辑,这样便于后面对Hudi源码的分析和学习。
本文新学习知识点:函数式接口以及对应的 Lambda 表达式的实现