前面的文章我们介绍了 StreamGraph 的生成,这个实际上只对应 Flink 作业在逻辑上的执行计划图。Flink 会进一步对 StreamGraph 进行转换,得到另一个执行计划图,即 JobGraph。

JobVertex

在 StreamGraph 中,每一个算子(Operator) 对应了图中的一个节点(StreamNode)。StreamGraph 会被进一步优化,将多个符合条件的节点串联(Chain) 在一起形成一个节点,从而减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。多个算子被 chain 在一起的形成的节点在 JobGraph 中对应的就是 JobVertex

每个 JobVertex 中包含一个或多个 Operators。 JobVertex 的主要成员变量包括

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
	/** The ID of the vertex. */
	private final JobVertexID id;

	/** The alternative IDs of the vertex. */
	private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();

	/** The IDs of all operators contained in this vertex. */
	private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();

	/** The alternative IDs of all operators contained in this vertex. */
	private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();

	/** List of produced data sets, one per writer */
	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();

	/** List of edges with incoming data. One per Reader. */
	private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();

	/** Number of subtasks to split this task into at runtime.*/
	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

其输入是 JobEdge 列表, 输出是 IntermediateDataSet 列表。

JobEdge

StramGraph 中,StreamNode 之间是通过 StreamEdge 建立连接的。在 JobEdge 中,对应的是 JobEdge

StreamEdge 中同时保留了源节点和目标节点 (sourceId 和 targetId)不同,在 JobEdge 中只有源节点的信息。由于 JobVertex 中保存了所有输入的 JobEdge 的信息,因而同样可以在两个节点之间建立连接。更确切地说,JobEdge 是和节点的输出结果相关联的,我们看下 JobEdge 主要的成员变量:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
	/** The vertex connected to this edge. */
	private final JobVertex target;

	/** The distribution pattern that should be used for this job edge. */
  // DistributionPattern 决定了在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
	private final DistributionPattern distributionPattern;

	/** The data set at the source of the edge, may be null if the edge is not yet connected*/
	private IntermediateDataSet source;

	/** The id of the source intermediate data set */
	private IntermediateDataSetID sourceId;

	/** Optional name for the data shipping strategy (forward, partition hash, rebalance, ...),
	 * to be displayed in the JSON plan */
	private String shipStrategyName;

IntermediateDataSet

JobVertex 产生的数据被抽象为 IntermediateDataSet, 字面意思为中间数据集,这个很容易理解。前面提到,JobEdge 是和节点的输出结果相关联的,其实就是指可以把 JobEdge 看作是 IntermediateDataSet 的消费者,那么 JobVertex 自然就是生产者了。

1
2
3
4
5
6
7
8
private final IntermediateDataSetID id; 		// the identifier

	private final JobVertex producer;			// the operation that produced this data set

	private final List<JobEdge> consumers = new ArrayList<JobEdge>();

	// The type of partition to use at runtime
	private final ResultPartitionType resultType;

其中 ResultPartitionType 表示中间结果的类型,说起来有点抽象,我们看下属性就明白了:

1
2
3
4
5
6
7
8
	/** Can the partition be consumed while being produced? */
	private final boolean isPipelined;

	/** Does the partition produce back pressure when not consumed? */
	private final boolean hasBackPressure;

	/** Does this partition use a limited number of (network) buffers? */
	private final boolean isBounded;

这个要结合 Flink 任务运行时的内存管理机制来看,在后面的文章再进行分析。目前在 Stream 模式下使用的类型是 PIPELINED_BOUNDED(true, true, true),上面的三个属性都是 true。

StreamConfig

对于每一个 StreamOperator, 也就是 StreamGraph 中的每一个 StreamGraph, 在生成 JobGraph 的过程中 StreamingJobGraphGenerator 都会创建一个对应的 StreamConfig

StreamConfig 中保存了这个算子(operator) 在运行是需要的所有配置信息,这些信息都是通过 key/value 的形式存储在 Configuration 中的。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
	//保存StreamOperator信息
	public void setStreamOperator(StreamOperator<?> operator) {
		if (operator != null) {
			config.setClass(USER_FUNCTION, operator.getClass());

			try {
				InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
			} catch (IOException e) {
				throw new StreamTaskException("Cannot serialize operator object "
						+ operator.getClass() + ".", e);
			}
		}
	}


  public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
		try {
			InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS);
		} catch (IOException e) {
			throw new StreamTaskException("Cannot serialize chained outputs.", e);
		}
	}

  public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
		try {
			InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS);
		} catch (IOException e) {
			throw new StreamTaskException("Cannot serialize non chained outputs.", e);
		}
	}

  public void setInPhysicalEdges(List<StreamEdge> inEdges) {
		try {
			InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES);
		} catch (IOException e) {
			throw new StreamTaskException("Cannot serialize inward edges.", e);
		}
	}

  //......

从 StreamGraph 到 JobGraph

StreamGraphJobGraph 的转换入口在 StreamingJobGraphGenerator 中。

首先来看下 StreamingJobGraphGenerator 的成员变量和入口函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
	//id -> JobVertex 的对应关系
	private final Map<Integer, JobVertex> jobVertices;
	//已经构建的JobVertex的id集合
	private final Collection<Integer> builtVertices;
  //物理边集合(不包含chain内部的边), 按创建顺序排序
  private List<StreamEdge> physicalEdgesInOrder;
  //保存 operataor chain 的信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
  private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
  //所有节点的配置信息,id -> StreamConfig
  private Map<Integer, StreamConfig> vertexConfigs;
  //保存每个节点的名字,id -> chainedName
  private Map<Integer, String> chainedNames;

  //用于计算hash值的算法
	private final StreamGraphHasher defaultStreamGraphHasher;
	private final List<StreamGraphHasher> legacyStreamGraphHashers;
	//.....

private JobGraph createJobGraph() {

		// 调度模式,立即启动
		jobGraph.setScheduleMode(ScheduleMode.EAGER);

		// 广度优先遍历 StreamGraph 并且为每个SteamNode生成hash,hash值将被用于 JobVertexId 中
		// 保证如果提交的拓扑没有改变,则每次生成的hash都是一样的
		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
		List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
		for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
		}

		Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

		// 主要的转换逻辑,生成 JobVetex, JobEdge 等
		setChaining(hashes, legacyHashes, chainedOperatorHashes);

		// 将每个JobVertex的输入边集合也序列化到该JobVertex的StreamConfig中
    // (出边集合已经在setChaining的时候写入了)
		setPhysicalEdges();

		// 根据group name,为每个 JobVertex 指定所属的 SlotSharingGroup
		// 以及针对 Iteration的头尾设置  CoLocationGroup
		setSlotSharingAndCoLocation();

		// 配置 checkpoint
		configureCheckpointing();

		// 添加用户提供的自定义的文件信息
		JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);

		// 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中
		try {
			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
		}
		catch (IOException e) {
			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
					"This indicates that non-serializable types (like custom serializers) were registered");
		}

		return jobGraph;
	}

StreamingJobGraphGenerator#createJobGraph 函数的逻辑也很清晰,首先为所有节点生成一个唯一的hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个id就不会改变,这主要用于故障恢复。这里我们不能用 StreamNode.id 来代替,因为这是一个从 1 开始的静态计数变量,同样的 Job 可能会得到不一样的 id。然后就是最关键的 chaining 处理,和生成JobVetex、JobEdge等。之后就是写入各种配置相关的信息。

我们先来看一下,Flink 是如何确定两个 Operator 是否能够被 chain 到同一个节点的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	//StreamEdge 两端的节点是否能够被 chain 到同一个 JobVertex 中
	public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
		//获取到上游和下游节点
		StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
		StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

		//获取到上游和下游节点具体的算子 StreamOperator
		StreamOperator<?> headOperator = upStreamVertex.getOperator();
		StreamOperator<?> outOperator = downStreamVertex.getOperator();

		return downStreamVertex.getInEdges().size() == 1 //下游节点只有一个输入
				&& outOperator != null
				&& headOperator != null
				&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slot共享组中
				//上下游算子的 chainning 策略,要允许chainning
				//默认的是 ALWAYS
				//在添加算子时,也可以强制使用 disableChain 设置为 NEVER
				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
				//上下游节点之间的数据传输方式必须是FORWARD,而不能是REBALANCE等其它模式
				&& (edge.getPartitioner() instanceof ForwardPartitioner)
				//上下游节点的并行度要一致
				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
				&& streamGraph.isChainingEnabled();
	}

只要一条边两端的节点满足上面的条件,那么这两个节点就可以被串联在同一个 JobVertex 中。接着来就来看最为关键的函数 setChaining 的逻辑:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
	/**
	 * Sets up task chains from the source {@link StreamNode} instances.
	 *
	 * <p>This will recursively create all {@link JobVertex} instances.
	 */
	private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
			createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
		}
	}


	//构建 operator chain(可能包含一个或多个StreamNode),返回值是当前的这个 operator chain 实际的输出边(不包括内部的边)
	//如果 currentNodeId != startNodeId, 说明当前节点在  operator chain 的内部
	private List<StreamEdge> createChain(
			Integer startNodeId,
			Integer currentNodeId,
			Map<Integer, byte[]> hashes,
			List<Map<Integer, byte[]>> legacyHashes,
			int chainIndex,
			Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

		if (!builtVertices.contains(startNodeId)) {

			//当前 operator chain 最终的输出边,不包括内部的边
			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

			//将当前节点的出边分为两组,即 chainable 和 nonChainable
			for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
				if (isChainable(outEdge, streamGraph)) { //判断当前 StreamEdge 的上下游是否可以串联在一起
					chainableOutputs.add(outEdge);
				} else {
					nonChainableOutputs.add(outEdge);
				}
			}

			//对于chainable的输出边,递归调用,找到最终的输出边并加入到输出列表中
			for (StreamEdge chainable : chainableOutputs) {
				transitiveOutEdges.addAll(
						createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
			}

			//对于 nonChainable 的边
			for (StreamEdge nonChainable : nonChainableOutputs) {
				//这个边本身就应该加入到当前节点的输出列表中
				transitiveOutEdges.add(nonChainable);
				//递归调用,以下游节点为起点创建新的operator chain
				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
			}

			//用于保存一个operator chain所有 operator 的 hash 信息
			List<Tuple2<byte[], byte[]>> operatorHashes =
				chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

			byte[] primaryHashBytes = hashes.get(currentNodeId);

			for (Map<Integer, byte[]> legacyHash : legacyHashes) {
				operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
			}

			//当前节点的名称,资源要求等信息
			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
			chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
			chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

			//如果当前节点是起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
			//createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
			StreamConfig config = currentNodeId.equals(startNodeId)
					? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
					: new StreamConfig(new Configuration());

			// 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
			// 其中包括 序列化器, StreamOperator, Checkpoint 等相关配置
			setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

			if (currentNodeId.equals(startNodeId)) {
				// 如果是chain的起始节点。(不是chain中的节点,也会被标记成 chain start)
				config.setChainStart();
				config.setChainIndex(0);
				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
				//把实际的输出边写入配置, 部署时会用到
				config.setOutEdgesInOrder(transitiveOutEdges);
				//operator chain 的头部 operator 的输出边,包括内部的边
				config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

				// 将当前节点(headOfChain)与所有出边相连
				for (StreamEdge edge : transitiveOutEdges) {
					// 通过StreamEdge构建出JobEdge,创建IntermediateDataSet,用来将JobVertex和JobEdge相连
					connect(startNodeId, edge);
				}

				// 将operator chain中所有子节点的 StreamConfig 写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中
				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

			} else {
				//如果是 operator chain 内部的节点
				Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

				if (chainedConfs == null) {
					chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
				}
				config.setChainIndex(chainIndex);
				StreamNode node = streamGraph.getStreamNode(currentNodeId);
				config.setOperatorName(node.getOperatorName());
				// 将当前节点的 StreamConfig 添加所在的 operator chain 的 config 集合中
				chainedConfigs.get(startNodeId).put(currentNodeId, config);
			}

			//设置当前 operator 的 OperatorID
			config.setOperatorID(new OperatorID(primaryHashBytes));

			if (chainableOutputs.isEmpty()) {
				config.setChainEnd();
			}
			return transitiveOutEdges;

		} else {
			return new ArrayList<>();
		}
	}

上述过程实际上就是通过 DFS 遍历所有的 StreamNode, 并按照 chainable 的条件不停地将可以串联的呃 operator 放在同一个的 operator chain 中。每一个 StreamNode 的配置信息都会被序列化到对应的 StreamConfig 中。只有 operator chain 的头部节点会生成对应的 JobVertex ,一个 operator chain 的所有内部节点都会以序列化的形式写入头部节点的 CHAINED_TASK_CONFIG 配置项中。

每一个 operator chain 都会为所有的实际输出边创建对应的 JobEdge,并和 JobVertex 连接:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void connect(Integer headOfChain, StreamEdge edge) {

		physicalEdgesInOrder.add(edge);

		Integer downStreamvertexID = edge.getTargetId();

		//上下游节点
		JobVertex headVertex = jobVertices.get(headOfChain);
		JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);

		StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
		//下游节点增加一个输入
		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

		StreamPartitioner<?> partitioner = edge.getPartitioner();
		JobEdge jobEdge;
		//创建 JobEdge 和 IntermediateDataSet
		//根据StreamPartitioner类型决定在上游节点(生产者)的子任务和下游节点(消费者)之间的连接模式
		if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
				headVertex,
				DistributionPattern.POINTWISE,
				ResultPartitionType.PIPELINED_BOUNDED);
		} else {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
					headVertex,
					DistributionPattern.ALL_TO_ALL,
					ResultPartitionType.PIPELINED_BOUNDED);
		}
		// set strategy name so that web interface can show it.
		jobEdge.setShipStrategyName(partitioner.toString());

		if (LOG.isDebugEnabled()) {
			LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
					headOfChain, downStreamvertexID);
		}
	}

小结

本文分析了从 StreamGraphJobGraph 之间的转换过程。 JobGraph 的关键在于将多个 StreamNode 优化为一个 JobVertex, 对应的 StreamEdge 则转化为 JobEdge, 并且 JobVertexJobEdge 之间通过 IntermediateDataSet 形成一个生产者和消费者的连接关系。

-EOF-