在编写 Flink 的程序的时候,核心的要点是构造出数据处理的拓扑结构,即任务执行逻辑的 DAG。我们先来看一下 Flink 任务的拓扑在逻辑上是怎么保存的。

StreamExecutionEnvironment

StreamExecutionEnvironment 是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类,如 LocalStreamEnvironment, RemoteStreamEnvironment 等。StreamExecutionEnvironment 也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfigCheckpointConfig 中。我们现在先只关注拓扑结构的产生。

通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:

1
2
3
4
senv.addSource(XXX)
	.map(XXX)
  .filter(XXX)
  .addSink(XXX)

添加数据源后获得 DataStream, 之后通过不同的算子不停地在 DataStream 上实现转换过滤等逻辑,最终将结果输出到 DataSink 中。

在 StreamExecutionEnvironment 内部使用一个 List<StreamTransformation<?>> transformations 来保留生成 DataStream 的所有转换。

StreamTransformation

StreamTransformation 代表了生成 DataStream 的操作,每一个 DataStream 的底层都有对应的一个 StreamTransformation。在 DataStream 上面通过 map 等算子不断进行转换,就得到了由 StreamTransformation 构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph

StreamTransformation 在运行时并不一定对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。

每一个 StreamTransformation 都有一个关联的 Id,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism 等信息。

StreamTransformation 有很多具体的子类,如SourceTransformationOneInputStreamTransformationTwoInputTransformationSideOutputTransformationSinkTransformation 等等,这些分别对应了DataStream 上的不同转换操作。

由于 StreamTransformation 中通常保留了其前向的 StreamTransformation,即其输入,因此可以据此还原出 DAG 的拓扑结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// OneInputTransformation
public OneInputTransformation(
			StreamTransformation<IN> input,
			String name,
			OneInputStreamOperator<IN, OUT> operator,
			TypeInformation<OUT> outputType,
			int parallelism) {
		super(name, outputType, parallelism);
		this.input = input;
		this.operator = operator;
	}

// TwoInputTransformation
public TwoInputTransformation(
			StreamTransformation<IN1> input1,
			StreamTransformation<IN2> input2,
			String name,
			TwoInputStreamOperator<IN1, IN2, OUT> operator,
			TypeInformation<OUT> outputType,
			int parallelism) {
		super(name, outputType, parallelism);
		this.input1 = input1;
		this.input2 = input2;
		this.operator = operator;
	}

DataStream

一个 DataStream 就表征了由同一种类型元素构成的数据流。通过对 DataStream 应用 map/filter 等操作,可以将一个 DataStream 转换为另一个 DataStream,这个转换的过程就是根据不同的操作生成不同的 StreamTransformation,并将其加入 StreamExecutionEnvironmenttransformations 列表中。

例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
//构造 StreamTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
				this.transformation,
				operatorName,
				operator,
				outTypeInfo,
				environment.getParallelism());

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//加入到 StreamExecutionEnvironment 的列表中
getExecutionEnvironment().addOperator(resultTransform);

DataStream 的子类包括 SingleOutputStreamOperatorDataStreamSource KeyedStreamIterativeStream, SplitStream(已弃用)。这里要吐槽一下 SingleOutputStreamOperator 的这个类的命名,太容易和 StreamOperator 混淆了。StreamOperator 的介绍见下一小节。

除了 DataStream 及其子类以外,其它的表征数据流的类还有 ConnectedStreams (两个流连接在一起)、 WindowedStreamAllWindowedStream 。这些数据流之间的转换可以参考 Flink 的官方文档

StreamOperator

在操作 DataStream 的时候,比如 DataStream#map 等,会要求我们提供一个自定义的处理函数。那么这些信息时如何保存在 StreamTransformation 中的呢?这里就要引入一个新的接口 StreamOperator

StreamOperator 定义了对一个具体的算子的生命周期的管理,包括:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
	//生命周期
	void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);

	void open() throws Exception;

	void close() throws Exception;

	@Override
	void dispose() throws Exception;

	//状态管理
	OperatorSnapshotFutures snapshotState(
		long checkpointId,
		long timestamp,
		CheckpointOptions checkpointOptions,
		CheckpointStreamFactory storageLocation) throws Exception;

	void initializeState() throws Exception;

  //其它方法暂时省略

StreamOperator 的两个子接口 OneInputStreamOperatorTwoInputStreamOperator 则提供了操作数据流中具体元素的方法,而 AbstractUdfStreamOperator 这个抽象子类则提供了自定义处理函数对应的算子的基本实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//OneInputStreamOperator
void processElement(StreamRecord<IN> element) throws Exception;
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;

//TwoInputStreamOperator
	void processElement1(StreamRecord<IN1> element) throws Exception;
	void processElement2(StreamRecord<IN2> element) throws Exception;


//AbstractUdfStreamOperator 接受一个用户自定义的处理函数
public AbstractUdfStreamOperator(F userFunction) {
		this.userFunction = requireNonNull(userFunction);
		checkUdfCheckpointingPreconditions();
	}

至于具体到诸如 map/fliter 等操作对应的 StreamOperator,基本都是在 AbstractUdfStreamOperator 的基础上实现的。以 StreamMap 为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class StreamMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;

	public StreamMap(MapFunction<IN, OUT> mapper) {
		super(mapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		output.collect(element.replace(userFunction.map(element.getValue())));
	}
}

由此,通过 DataStream –> StreamTransformation –> StreamOperator 这样的依赖关系,就可以完成 DataStream 的转换,并且保留数据流和应用在流上的算子之间的关系。

StreamGraph

StreamGraphGenerator 会基于 StreamExecutionEnvironmenttransformations 列表来生成 StreamGraph

在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用StreamGraphGenerator#transform方法。对于每一个 StreamTransformation, 确保当前其上游已经完成转换。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode,并为上下游节点添加边 StreamEdge

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Collection<Integer> transformedIds;
		if (transform instanceof OneInputTransformation<?, ?>) {
			transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
		} else if (transform instanceof SourceTransformation<?>) {
			transformedIds = transformSource((SourceTransformation<?>) transform);
		} else if (transform instanceof SinkTransformation<?>) {
			transformedIds = transformSink((SinkTransformation<?>) transform);
		} else if (transform instanceof UnionTransformation<?>) {
			transformedIds = transformUnion((UnionTransformation<?>) transform);
		} else if (transform instanceof SplitTransformation<?>) {
			transformedIds = transformSplit((SplitTransformation<?>) transform);
		} else if (transform instanceof SelectTransformation<?>) {
			transformedIds = transformSelect((SelectTransformation<?>) transform);
		} else if (transform instanceof FeedbackTransformation<?>) {
			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
		} else if (transform instanceof CoFeedbackTransformation<?>) {
			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
		} else if (transform instanceof PartitionTransformation<?>) {
			transformedIds = transformPartition((PartitionTransformation<?>) transform);
		} else if (transform instanceof SideOutputTransformation<?>) {
			transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
		} else {
			throw new IllegalStateException("Unknown transformation: " + transform);
		}

对于不同类型的 StreamTransformation,分别调用对应的转换方法,以 最典型的 transformOneInputTransform 为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

		//首先确保上游节点完成转换
		Collection<Integer> inputIds = transform(transform.getInput());

		// the recursive call might have already transformed this
    // 由于是递归调用的,可能已经完成了转换
		if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}

		//确定资源共享组,用户如果没有指定,默认是default
		String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

		//向 StreamGraph 中添加 Operator, 这一步会生成对应的 StreamNode
		streamGraph.addOperator(transform.getId(),
				slotSharingGroup,
				transform.getCoLocationGroupKey(),
				transform.getOperator(),
				transform.getInputType(),
				transform.getOutputType(),
				transform.getName());

		if (transform.getStateKeySelector() != null) {
			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
			streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
		}

		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

		//依次连接到上游节点,创建 StreamEdge
		for (Integer inputId: inputIds) {
			streamGraph.addEdge(inputId, transform.getId(), 0);
		}

		return Collections.singleton(transform.getId());
	}

接着看一看 StreamGraph 中对应的添加节点和边的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected StreamNode addNode(Integer vertexID,
		String slotSharingGroup,
		@Nullable String coLocationGroup,
		Class<? extends AbstractInvokable> vertexClass,
		StreamOperator<?> operatorObject,
		String operatorName) {

		if (streamNodes.containsKey(vertexID)) {
			throw new RuntimeException("Duplicate vertexID " + vertexID);
		}

		StreamNode vertex = new StreamNode(environment,
			vertexID,
			slotSharingGroup,
			coLocationGroup,
			operatorObject,
			operatorName,
			new ArrayList<OutputSelector<?>>(),
			vertexClass);

		//创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
		streamNodes.put(vertexID, vertex);

		return vertex;
	}

StreamNode 中,保存了对应的 StreamOperator (从 StreamTransformation 得到),并且还引入了变量 jobVertexClass 来表示该节点在 TaskManager 中运行时的实际任务类型。

1
private final Class<? extends AbstractInvokable> jobVertexClass;

AbstractInvokable 是所有可以在 TaskManager 中运行的任务的抽象基础类,包括流式任务和批任务。StreamTask 是所有流式任务的基础类,其具体的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask 等。

对于一些不包含物理转换操作的 StreamTransformation,如 Partitioning, split/select, union,并不会生成 StreamNode,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。

PartitionTansformation 为例, PartitionTansformationKeyedStream 对应的转换:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//StreamGraphGenerator#transformPartition
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
		StreamTransformation<T> input = partition.getInput();
		List<Integer> resultIds = new ArrayList<>();

		//递归地转换上游节点
		Collection<Integer> transformedIds = transform(input);

		for (Integer transformedId: transformedIds) {
			int virtualId = StreamTransformation.getNewNodeId();
			//添加虚拟的 Partition 节点
			streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
			resultIds.add(virtualId);
		}

		return resultIds;
	}

// StreamGraph#addVirtualPartitionNode
public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {

		if (virtualPartitionNodes.containsKey(virtualId)) {
			throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
		}

		//添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
		virtualPartitionNodes.put(virtualId,
				new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
	}

前面提到,在每一个物理节点的转换上,会调用 StreamGraph#addEdge 在输入节点和当前节点之间建立边的连接:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void addEdgeInternal(Integer upStreamVertexID,
			Integer downStreamVertexID,
			int typeNumber,
			StreamPartitioner<?> partitioner,
			List<String> outputNames,
			OutputTag outputTag) {

		//先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
		//在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
		if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
			......
		} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
			int virtualId = upStreamVertexID;
			upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
			if (partitioner == null) {
				partitioner = virtualPartitionNodes.get(virtualId).f1;
			}
			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
		} else {

			//两个物理节点
			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
			StreamNode downstreamNode = getStreamNode(downStreamVertexID);

			// If no partitioner was specified and the parallelism of upstream and downstream
			// operator matches use forward partitioning, use rebalance otherwise.
			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
				partitioner = new ForwardPartitioner<Object>();
			} else if (partitioner == null) {
				partitioner = new RebalancePartitioner<Object>();
			}

			if (partitioner instanceof ForwardPartitioner) {
				if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
					throw new UnsupportedOperationException("Forward partitioning does not allow " +
							"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
							", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
							" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
				}
			}

			//创建 StreamEdge,保留了 StreamPartitioner 等属性
			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);

			//分别将StreamEdge添加到上游节点和下游节点
			getStreamNode(edge.getSourceId()).addOutEdge(edge);
			getStreamNode(edge.getTargetId()).addInEdge(edge);
		}
	}

这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。

小结

本文简单分析了从 DataStream API 到 StramGraph 的过程。 StreamGraph 是 Flink 任务最接近用户逻辑的 DAG 表示,后面到具体执行的时候还会进行一系列转换,我们在后续的文章中再逐一加以分析。

-EOF-