Flink StreamGraph生成过程

文章目录

    • 概要
    • SteramGraph 核心对象
    • SteramGraph 生成过程

概要

在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
在这里插入图片描述
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

SteramGraph 核心对象

  • StreamNode
    StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

SteramGraph 生成过程

StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中

    @Internalpublic StreamGraph getStreamGraph() {return this.getStreamGraph(this.getJobName());}@Internalpublic StreamGraph getStreamGraph(String jobName) {return this.getStreamGraph(jobName, true);}@Internalpublic StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {this.transformations.clear();}return streamGraph;}private StreamGraphGenerator getStreamGraphGenerator() {if (this.transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");} else {RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);}}

StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.apache.flink.streaming.api.graph;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator.Context;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.ReduceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TimestampsAndWatermarksTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;@Internal
public class StreamGraphGenerator {private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC;public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";public static final String DEFAULT_SLOT_SHARING_GROUP = "default";private final List<Transformation<?>> transformations;private final ExecutionConfig executionConfig;private final CheckpointConfig checkpointConfig;private final ReadableConfig configuration;private StateBackend stateBackend;private boolean chaining;private Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts;private TimeCharacteristic timeCharacteristic;private String jobName;private SavepointRestoreSettings savepointRestoreSettings;private long defaultBufferTimeout;private RuntimeExecutionMode runtimeExecutionMode;private boolean shouldExecuteInBatchMode;private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;protected static Integer iterationIdCounter;private StreamGraph streamGraph;private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;public static int getNewIterationNodeId() {Integer var0 = iterationIdCounter;iterationIdCounter = iterationIdCounter - 1;return iterationIdCounter;}public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {this(transformations, executionConfig, checkpointConfig, new Configuration());}public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {this.chaining = true;this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;this.jobName = "Flink Streaming Job";this.savepointRestoreSettings = SavepointRestoreSettings.none();this.defaultBufferTimeout = -1L;this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;this.transformations = (List)Preconditions.checkNotNull(transformations);this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);this.checkpointConfig = new CheckpointConfig(checkpointConfig);this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);}public StreamGraphGenerator setRuntimeExecutionMode(RuntimeExecutionMode runtimeExecutionMode) {this.runtimeExecutionMode = (RuntimeExecutionMode)Preconditions.checkNotNull(runtimeExecutionMode);return this;}public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {this.stateBackend = stateBackend;return this;}public StreamGraphGenerator setChaining(boolean chaining) {this.chaining = chaining;return this;}public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts) {this.userArtifacts = userArtifacts;return this;}public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {this.timeCharacteristic = timeCharacteristic;return this;}public StreamGraphGenerator setDefaultBufferTimeout(long defaultBufferTimeout) {this.defaultBufferTimeout = defaultBufferTimeout;return this;}public StreamGraphGenerator setJobName(String jobName) {this.jobName = jobName;return this;}public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {this.savepointRestoreSettings = savepointRestoreSettings;}public StreamGraph generate() {this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);this.configureStreamGraph(this.streamGraph);this.alreadyTransformed = new HashMap();Iterator var1 = this.transformations.iterator();while(var1.hasNext()) {Transformation<?> transformation = (Transformation)var1.next();this.transform(transformation);}StreamGraph builtStreamGraph = this.streamGraph;this.alreadyTransformed.clear();this.alreadyTransformed = null;this.streamGraph = null;return builtStreamGraph;}private void configureStreamGraph(StreamGraph graph) {Preconditions.checkNotNull(graph);graph.setChaining(this.chaining);graph.setUserArtifacts(this.userArtifacts);graph.setTimeCharacteristic(this.timeCharacteristic);graph.setJobName(this.jobName);if (this.shouldExecuteInBatchMode) {if (this.checkpointConfig.isCheckpointingEnabled()) {LOG.info("Disabled Checkpointing. Checkpointing is not supported and not needed when executing jobs in BATCH mode.");this.checkpointConfig.disableCheckpointing();}graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);this.setDefaultBufferTimeout(-1L);this.setBatchStateBackendAndTimerService(graph);} else {graph.setStateBackend(this.stateBackend);graph.setScheduleMode(ScheduleMode.EAGER);if (this.checkpointConfig.isApproximateLocalRecoveryEnabled()) {this.checkApproximateLocalRecoveryCompatibility();graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);} else {graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);}}}private void checkApproximateLocalRecoveryCompatibility() {Preconditions.checkState(!this.checkpointConfig.isUnalignedCheckpointsEnabled(), "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");}private void setBatchStateBackendAndTimerService(StreamGraph graph) {boolean useStateBackend = (Boolean)this.configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);boolean sortInputs = (Boolean)this.configuration.get(ExecutionOptions.SORT_INPUTS);Preconditions.checkState(!useStateBackend || sortInputs, "Batch state backend requires the sorted inputs to be enabled!");if (useStateBackend) {LOG.debug("Using BATCH execution state backend and timer service.");graph.setStateBackend(new BatchExecutionStateBackend());graph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);} else {graph.setStateBackend(this.stateBackend);}}private boolean shouldExecuteInBatchMode(RuntimeExecutionMode configuredMode) {boolean existsUnboundedSource = this.existsUnboundedSource();Preconditions.checkState(configuredMode != RuntimeExecutionMode.BATCH || !existsUnboundedSource, "Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "' to STREAMING or AUTOMATIC");if (Preconditions.checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {return configuredMode == RuntimeExecutionMode.BATCH;} else {return !existsUnboundedSource;}}private boolean existsUnboundedSource() {return this.transformations.stream().anyMatch((transformation) -> {return this.isUnboundedSource(transformation) || transformation.getTransitivePredecessors().stream().anyMatch(this::isUnboundedSource);});}private boolean isUnboundedSource(Transformation<?> transformation) {Preconditions.checkNotNull(transformation);return transformation instanceof WithBoundedness && ((WithBoundedness)transformation).getBoundedness() != Boundedness.BOUNDED;}private Collection<Integer> transform(Transformation<?> transform) {if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {LOG.debug("Transforming " + transform);if (transform.getMaxParallelism() <= 0) {int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig > 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}transform.getOutputType();TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator)translatorMap.get(transform.getClass());Collection transformedIds;if (translator != null) {transformedIds = this.translate(translator, transform);} else {transformedIds = this.legacyTransform(transform);}if (!this.alreadyTransformed.containsKey(transform)) {this.alreadyTransformed.put(transform, transformedIds);}return transformedIds;}}private Collection<Integer> legacyTransform(Transformation<?> transform) {Collection transformedIds;if (transform instanceof FeedbackTransformation) {transformedIds = this.transformFeedback((FeedbackTransformation)transform);} else {if (!(transform instanceof CoFeedbackTransformation)) {throw new IllegalStateException("Unknown transformation: " + transform);}transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);}if (transform.getBufferTimeout() >= 0L) {this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());} else {this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);}if (transform.getUid() != null) {this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() != null) {this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());} else {if (transform.getMinResources() != null && transform.getPreferredResources() != null) {this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());return transformedIds;}}private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else if (iterate.getFeedbackEdges().size() <= 0) {throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");} else {List<Transformation<?>> inputs = iterate.getInputs();Preconditions.checkState(inputs.size() == 1);Transformation<?> input = (Transformation)inputs.get(0);List<Integer> resultIds = new ArrayList();Collection<Integer> inputIds = this.transform(input);resultIds.addAll(inputIds);if (this.alreadyTransformed.containsKey(iterate)) {return (Collection)this.alreadyTransformed.get(iterate);} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);resultIds.add(itSource.getId());this.alreadyTransformed.put(iterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var10 = iterate.getFeedbackEdges().iterator();while(var10.hasNext()) {Transformation<T> feedbackEdge = (Transformation)var10.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var13 = feedbackIds.iterator();while(var13.hasNext()) {Integer feedbackId = (Integer)var13.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);if (slotSharingGroup == null) {slotSharingGroup = "SlotSharingGroup-" + iterate.getId();}itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return resultIds;}}}private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {if (this.shouldExecuteInBatchMode) {throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());} else {Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());StreamNode itSource = (StreamNode)itSourceAndSink.f0;StreamNode itSink = (StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);Collection<Integer> resultIds = Collections.singleton(itSource.getId());this.alreadyTransformed.put(coIterate, resultIds);List<Integer> allFeedbackIds = new ArrayList();Iterator var7 = coIterate.getFeedbackEdges().iterator();while(var7.hasNext()) {Transformation<F> feedbackEdge = (Transformation)var7.next();Collection<Integer> feedbackIds = this.transform(feedbackEdge);allFeedbackIds.addAll(feedbackIds);Iterator var10 = feedbackIds.iterator();while(var10.hasNext()) {Integer feedbackId = (Integer)var10.next();this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);}}String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);itSink.setSlotSharingGroup(slotSharingGroup);itSource.setSlotSharingGroup(slotSharingGroup);return Collections.singleton(itSource.getId());}}private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {Preconditions.checkNotNull(translator);Preconditions.checkNotNull(transform);List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());if (this.alreadyTransformed.containsKey(transform)) {return (Collection)this.alreadyTransformed.get(transform);} else {String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);}}private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {List<Collection<Integer>> allInputIds = new ArrayList();if (parentTransformations == null) {return allInputIds;} else {Iterator var3 = parentTransformations.iterator();while(var3.hasNext()) {Transformation<?> transformation = (Transformation)var3.next();allInputIds.add(this.transform(transformation));}return allInputIds;}}private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {if (specifiedGroup != null) {return specifiedGroup;} else {String inputGroup = null;Iterator var4 = inputIds.iterator();while(var4.hasNext()) {int id = (Integer)var4.next();String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);if (inputGroup == null) {inputGroup = inputGroupCandidate;} else if (!inputGroup.equals(inputGroupCandidate)) {return "default";}}return inputGroup == null ? "default" : inputGroup;}}static {DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap();tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());tmp.put(SourceTransformation.class, new SourceTransformationTranslator());tmp.put(SinkTransformation.class, new SinkTransformationTranslator());tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());tmp.put(UnionTransformation.class, new UnionTransformationTranslator());tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());translatorMap = Collections.unmodifiableMap(tmp);iterationIdCounter = 0;}private static class ContextImpl implements Context {private final StreamGraphGenerator streamGraphGenerator;private final StreamGraph streamGraph;private final String slotSharingGroup;private final ReadableConfig config;public ContextImpl(StreamGraphGenerator streamGraphGenerator, StreamGraph streamGraph, String slotSharingGroup, ReadableConfig config) {this.streamGraphGenerator = (StreamGraphGenerator)Preconditions.checkNotNull(streamGraphGenerator);this.streamGraph = (StreamGraph)Preconditions.checkNotNull(streamGraph);this.slotSharingGroup = (String)Preconditions.checkNotNull(slotSharingGroup);this.config = (ReadableConfig)Preconditions.checkNotNull(config);}public StreamGraph getStreamGraph() {return this.streamGraph;}public Collection<Integer> getStreamNodeIds(Transformation<?> transformation) {Preconditions.checkNotNull(transformation);Collection<Integer> ids = (Collection)this.streamGraphGenerator.alreadyTransformed.get(transformation);Preconditions.checkState(ids != null, "Parent transformation \"" + transformation + "\" has not been transformed.");return ids;}public String getSlotSharingGroup() {return this.slotSharingGroup;}public long getDefaultBufferTimeout() {return this.streamGraphGenerator.defaultBufferTimeout;}public ReadableConfig getGraphGeneratorConfig() {return this.config;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/268336.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享经典、现代和前沿软件工程课程

随着信息技术的发展&#xff0c;软件已经深入到人类社会生产和生活的各个方面。软件工程是将工程化的方法运用到软件的开发、运行和维护之中&#xff0c;以达到提高软件质量&#xff0c;降低开发成本的目的。软件工程已经成为当今最活跃、最热门的学科之一。 本次软件工程MOOC课…

SAP PP学习笔记05 - BOM配置(Customize)1 - 修正参数

上次学习了BOM相关的内容。 SAP PP学习笔记04 - BOM1 - BOM创建&#xff0c;用途&#xff0c;形式&#xff0c;默认值&#xff0c;群组BOM等_sap销售bom与生产bom-CSDN博客 SAP PP学习笔记04 - BOM2 -通过Serial来做简单的BOM变式配置&#xff0c;副明细&#xff0c;BOM状态&…

Matlab 最小二乘插值(曲线拟合)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 在多项式插值时,当数据点个数较多时,插值会导致多项式曲线阶数过高,带来不稳定因素。因此我们可以通过固定幂基函数的最高次数 m(m < n),来对我们要拟合的曲线进行降阶。之前的函数形式就可以变为: 二、实现…

Unity绘制六边形体

现在steam上面有很多下棋类/经营类的游戏都是用六边形的地形&#xff0c;比较美观而且实用&#xff0c;去年在版本末期我也自己尝试做了一个绘制六边体的demo&#xff0c;一年没接触unity竟然都要忘光了&#xff0c;赶紧在这边记录一下。 想cv代码可以直接拉到代码章节 功能 …

力扣周赛387

第一题 代码 package Competition.The387Competitioin;public class Demo1 {public static void main(String[] args) {}public int[] resultArray(int[] nums) {int ans[]new int[nums.length];int arr1[]new int[nums.length];int arr2[]new int[nums.length];if(nums.leng…

[AutoSar]BSW_Com09 CAN driver 模块FULL(BASIC)CAN、FIFO选择

目录 关键词平台说明一、FULL CAN 和Basic CAN 关键词 嵌入式、C语言、autosar、OS、BSW 平台说明 项目ValueOSautosar OSautosar厂商vector &#xff0c;芯片厂商TI 英飞凌编程语言C&#xff0c;C编译器HighTec (GCC)autosar版本4.3.1 >>>>>回到总目录<&…

云计算市场,从追求“规模制胜”到走向“用户分化”

文|智能相对论 作者|叶远风 通常来说&#xff0c;价格战放到任何行业&#xff0c;都不是什么好事。 如今&#xff0c;作为曾经的前沿技术创新&#xff0c;云计算行业正在被迫走入价格战的阴霾当中&#xff0c;引发业界担忧。 ECS&#xff08;云服务器&#xff09;最高降36%…

wordpress外贸独立站

WordPress外贸电商主题 简洁实用的wordpress外贸电商主题&#xff0c;适合做外贸跨境的电商公司官网使用。 https://www.jianzhanpress.com/?p5025 华强北面3C数码WordPress外贸模板 电脑周边、3C数码产品行业的官方网站使用&#xff0c;用WordPress外贸模板快速搭建外贸网…

【c++】继承深度解剖

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大二&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解什么事继承&#xff0c;基类和派生类的使用和…

YOLOv8改进涨点,添加GSConv+Slim Neck,有效提升目标检测效果,代码改进(超详细)

目录 摘要 主要想法 GSConv GSConv代码实现 slim-neck slim-neck代码实现 yaml文件 完整代码分享 总结 摘要 目标检测是计算机视觉中重要的下游任务。对于车载边缘计算平台来说&#xff0c;巨大的模型很难达到实时检测的要求。而且&#xff0c;由大量深度可分离卷积层构…

4. 编写app组件

1. 代码 main.ts // 引入createApp用于创建应用 import {createApp} from "vue"// 引入App根组件 import App from ./App.vue createApp(App).mount(#app) App.vue <!-- vue文件可以写三种标签1. template标签&#xff0c;写html结构2. script 脚本标签&…

【论文阅读】微纳米气泡技术作为CO2-EOR和CO2地质储存技术的新方向:综述

Micro and nanobubbles technologies as a new horizon for CO2-EOR and CO2 geological storage techniques: A review 微纳米气泡技术作为CO2-EOR和CO2地质储存技术的新方向&#xff1a;综述 期刊信息&#xff1a;Fuel 2023 期刊级别&#xff1a;EI检索 SCI升级版工程技术1区…

微服务 人工智能AI 物联网智慧工地云平台源码

目录 ​编辑 智慧工地架构 智慧工地系统 智慧工地云平台功能模块 1、基础数据管理 2、考勤管理 3、安全隐患管理 4、视频监控 5、塔吊监控 6、升降机监控 7、移动端数据推送 智慧工地管理平台子系统构成 智慧工地物联网解决方案&#xff0c;对工地施工安全人员、设…

在Jupyter-lab中使用RDKit画分子2D图

在Jupyter-lab中使用RDKit画分子2D图 在做完分子对接后&#xff0c;想看看筛选后的分子的结构。因此想利用Jupyter-lab来画分子的2D图。 1. 安装Jupyter-lab与RDKit 系统&#xff1a;Win11已安装conda RDKit 是一个功能强大、灵活易用的化学信息学工具包&#xff0c;广泛应…

YOLOv应用开发与实现

一、背景与简介 YOLO&#xff08;You Only Look Once&#xff09;是一种流行的实时目标检测系统&#xff0c;其核心思想是将目标检测视为回归问题&#xff0c;从而可以在单个网络中进行端到端的训练。YOLOv作为该系列的最新版本&#xff0c;带来了更高的检测精度和更快的处理速…

Node.js基础---npm与包

包 概念&#xff1a;Node.js 中的第三方模块又叫做包 来源&#xff1a;由第三方个人或团队开发出来的&#xff0c;免费使用&#xff0c;且为开源 为什么需要&#xff1a;Node.js的内置模块只有一些底层API&#xff0c;开发效率低 包是基于内置模块封装出来的&#xff0c;提供更…

FPGA高端项目:FPGA基于GS2971的SDI视频接收转HDMI输出,提供3套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收图像缩放应用本方案的SDI接收纯verilog图像缩放纯verilog多路视频拼接应用本方案的SDI接收HLS图像缩放HLS多路视频拼接应用本方案的SDI接收HLS动态字符叠加输出应用本方案的SDI接收HLS多路视频融…

【详识JAVA语言】抽象类和接口

抽象类 抽象类概念 在面向对象的概念中&#xff0c;所有的对象都是通过类来描绘的&#xff0c;但是反过来&#xff0c;并不是所有的类都是用来描绘对象的&#xff0c;如果 一个类中没有包含足够的信息来描绘一个具体的对象&#xff0c;这样的类就是抽象类。 比如&#xff1a;…

一文搞懂浏览器缓存机制

文章目录 概述强制缓存协商缓存总结参考文章 概述 浏览器的缓存机制也就是我们说的HTTP缓存机制&#xff0c;其机制是根据HTTP报文的缓存标识进行的 浏览器第一次向服务器发送HTTP请求, 浏览器拿到请求结果后&#xff0c;会根据响应报文的缓存标识&#xff0c;决定是否进行缓存…

Web元素定位工具-ChroPath

往往Selenium元素定位需要知道具体元素的位置准确定位&#xff0c;一步一步找元素的位置很麻烦并且费时。今天介绍一款辅助定位web网站元素位置的插件ChroPath由此很有用&#xff0c;本文将介绍ChroPath完整版安装和使用&#xff08;含插件包&#xff09;&#xff0c;让seleniu…