在上一篇文章中,我们对 Flink 状态管理相关的代码逻辑进行了分析,但为了实现任务的故障恢复以及数据一致性的效果,还需要借助于检查点(Checkpoint)机制。

简单地说,Checkpoint 是一种分布式快照:在某一时刻,对一个 Flink 作业所有的 task 做一个快照(snapshot),并且将快照保存在 memory / file system 等存储系统中。这样,在任务进行故障恢复的时候,就可以还原到任务故障前最近一次检查点的状态,从而保证数据的一致性。当然,为了保证 exactly-once / at-least-once 的特性,还需要数据源支持数据回放。

概述

Flink 的 checkpoint 机制基于 chandy-lamda 算法,具体的实现可以参考 Flink 官方的文档以及 Flink 团队发表的论文 State Management in Apache Flink。这里先做一下概要性的介绍。

Flink 分布式快照的核心在与 stream barrier,barrier 是一种特殊的标记消息,会和正常的消息记录一起在数据流中向前流动。Checkpoint Coordinator 在需要触发检查点的时候要求数据源向数据流中注入 barrie, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier,则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈完成了快照之后,Checkpoint Coordinator 认为检查点创建完毕。

stream_barriers.svg

checkpointing.svg

Checkpoint 的发起流程

CheckpointCoordinator 是 Flink 分布式快照流程的“协调者”,它主要负责:

  • 发起 checkpoint 触发的消息,并接收不同 task 对 checkpoint 的响应信息(Ack)
  • 维护 Ack 中附带的状态句柄(state-handle)的全局视图

StreamingJobGraphGenerator 中,生成 JobGraph 之后会调用 configureCheckpointing 方法进行 Checkpoint 相关的配置。

这其中会有三个列表:

  • List<JobVertexID> triggerVertices
  • List<JobVertexID> ackVertices
  • List<JobVertexID> commitVertices

其中, triggerVertices 只包含那些作为 source 的节点,ackVerticescommitVertices 均包含所有的节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class StreamingJobGraphGenerator {
	private void configureCheckpointing() {
		//  .........

		// collect the vertices that receive "trigger checkpoint" messages.
		// currently, these are all the sources
		List<JobVertexID> triggerVertices = new ArrayList<>();

		// collect the vertices that need to acknowledge the checkpoint
		// currently, these are all vertices
		List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());

		// collect the vertices that receive "commit checkpoint" messages
		// currently, these are all vertices
		List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

		for (JobVertex vertex : jobVertices.values()) {
			if (vertex.isInputVertex()) {
				triggerVertices.add(vertex.getID());
			}
			commitVertices.add(vertex.getID());
			ackVertices.add(vertex.getID());
		}

		//...............
}

ExecutionGraphBuilder#buildGraph 中,如果作业开启了 checkpoint,则会调用 ExecutionGraph.enableCheckpointing() 方法, 这里会创建 CheckpointCoordinator 对象,并注册一个作业状态的监听 CheckpointCoordinatorDeActivator, CheckpointCoordinatorDeActivator 会在作业状态发生改变时得到通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ExecutionGraph {
	public void enableCheckpointing(
			long interval,
			long checkpointTimeout,
			long minPauseBetweenCheckpoints,
			int maxConcurrentCheckpoints,
			CheckpointRetentionPolicy retentionPolicy,
			List<ExecutionJobVertex> verticesToTrigger,
			List<ExecutionJobVertex> verticesToWaitFor,
			List<ExecutionJobVertex> verticesToCommitTo,
			List<MasterTriggerRestoreHook<?>> masterHooks,
			CheckpointIDCounter checkpointIDCounter,
			CompletedCheckpointStore checkpointStore,
			StateBackend checkpointStateBackend,
			CheckpointStatsTracker statsTracker) {

		// simple sanity checks

		ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
		ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);

		// create the coordinator that triggers and commits checkpoints and holds the state
		checkpointCoordinator = new CheckpointCoordinator(
			jobInformation.getJobId(),
			interval,
			checkpointTimeout,
			minPauseBetweenCheckpoints,
			maxConcurrentCheckpoints,
			retentionPolicy,
			tasksToTrigger,
			tasksToWaitFor,
			tasksToCommitTo,
			checkpointIDCounter,
			checkpointStore,
			checkpointStateBackend,
			ioExecutor,
			SharedStateRegistry.DEFAULT_FACTORY);

		// register the master hooks on the checkpoint coordinator
		for (MasterTriggerRestoreHook<?> hook : masterHooks) {
			if (!checkpointCoordinator.addMasterHook(hook)) {
				LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
			}
		}

		checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);

		// interval of max long value indicates disable periodic checkpoint,
		// the CheckpointActivatorDeactivator should be created only if the interval is not max value
		if (interval != Long.MAX_VALUE) {
			// the periodic checkpoint scheduler is activated and deactivated as a result of
			// job status changes (running -> on, all other states -> off)
			registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
		}
	}

}

当状态变为 RUNNING 时,CheckpointCoordinatorDeActivator会得到通知,并且通过 CheckpointCoordinator.startCheckpointScheduler 启动 checkpoint 的定时器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class CheckpointCoordinatorDeActivator implements JobStatusListener {
	@Override
	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
		if (newJobStatus == JobStatus.RUNNING) {
			// start the checkpoint scheduler
			coordinator.startCheckpointScheduler();
		} else {
			// anything else should stop the trigger for now
			coordinator.stopCheckpointScheduler();
		}
	}
}

定时任务被封装为 ScheduledTrigger, 运行时会调用 CheckpointCoordinator.triggerCheckpoint() 触发一次 checkpoint。CheckpointCoordinator.triggerCheckpoint 方法代码逻辑很长,概括地说,包括以下几个步骤:

  • 检查是否可以触发 checkpoint,包括是否需要强制进行 checkpoint,当前正在排队的并发 checkpoint 的数目是否超过阈值,距离上一次成功 checkpoint 的间隔时间是否过小等,如果这些条件不满足,则当前检查点的触发请求不会执行
  • 检查是否所有需要触发 checkpoint 的 Execution 都是 RUNNING 状态
  • 生成此次 checkpoint 的 checkpointID(id 是严格自增的),并初始化 CheckpointStorageLocationCheckpointStorageLocation 是此次 checkpoint 存储位置的抽象,通过 CheckpointStorage.initializeLocationForCheckpoint() 创建(CheckpointStorage 目前有两个具体实现,分别为 FsCheckpointStorageMemoryBackendCheckpointStorage),CheckpointStorage 则是从 StateBackend 中创建
  • 生成 PendingCheckpoint,这表示一个处于中间状态的 checkpoint,并保存在 checkpointId -> PendingCheckpoint 这样的映射关系中
  • 注册一个调度任务,在 checkpoint 超时后取消此次 checkpoint,并重新触发一次新的 checkpoint
  • 调用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 发起 checkpoint 请求

savepoint 和 checkpoint 的处理逻辑基本一致,只是 savepoint 是强制触发的,需要调用 Execution.triggerSynchronousSavepoint() 进行触发。

在CheckpointCoordinator 内部也有三个列表:

  • ExecutionVertex[] tasksToTrigger;
  • ExecutionVertex[] tasksToWaitFor;
  • ExecutionVertex[] tasksToCommitTo;

这就对应了前面 JobGraph 中的三个列表,在触发 checkpoint 的时候,只有作为 source 的 Execution 会调用 Execution.triggerCheckpoint() 方法。会通过 RPC 调用通知对应的 RpcTaskManagerGateway 调用 triggerCheckpoint

Checkpoint 的执行

barrier 的流动

CheckpointCoordinator 发出触发 checkpoint 的消息,最终通过 RPC 调用 TaskExecutorGateway.triggerCheckpoint,即请求执行 TaskExecutor.triggerCheckpoin()。 因为一个 TaskExecutor 中可能有多个 Task 正在运行,因而要根据触发 checkpoint 的 ExecutionAttemptID 找到对应的 Task,然后调用 Task.triggerCheckpointBarrier() 方法。只有作为 source 的 Task 才会触发 triggerCheckpointBarrier() 方法的调用。

Task 中,checkpoint 的触发被封装为一个异步任务执行,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Task {
	public void triggerCheckpointBarrier(
			final long checkpointID,
			final long checkpointTimestamp,
			final CheckpointOptions checkpointOptions,
			final boolean advanceToEndOfEventTime) {
		......
		if (executionState == ExecutionState.RUNNING && invokable != null) {
			// build a local closure
			final String taskName = taskNameWithSubtask;
			final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
				FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					// set safety net from the task's context for checkpointing thread
					FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
					try {
						//真正的调用逻辑
						boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
						if (!success) {
							checkpointResponder.declineCheckpoint(
									getJobID(), getExecutionId(), checkpointID,
									new CheckpointDeclineTaskNotReadyException(taskName));
						}
					}
					catch (Throwable t) {
						if (getExecutionState() == ExecutionState.RUNNING) {
							failExternally(new Exception(
								"Error while triggering checkpoint " + checkpointID + " for " +
									taskNameWithSubtask, t));
						} else {
							LOG.debug("Encountered error while triggering checkpoint {} for " +
								"{} ({}) while being not in state running.", checkpointID,
								taskNameWithSubtask, executionId, t);
						}
					} finally {
						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
					}
				}
			};
			//异步执行
			executeAsyncCallRunnable(
					runnable,
					String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId),
					checkpointOptions.getCheckpointType().isSynchronous());
		} else {
			// send back a message that we did not do the checkpoint
			checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
					new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
		}

	}
}

Task 执行 checkpoint 的真正逻辑被封装在 AbstractInvokable.triggerCheckpoint(...) 中,AbstractInvokable 中有两个触发 checkpoint 的方法:

  • triggerCheckpoint
  • triggerCheckpointOnBarrier

其中 triggerCheckpoint 是触发 checkpoint 的源头,会向下游注入 CheckpointBarrier;而下游的其他任务在收到 CheckpointBarrier 后调用 triggerCheckpointOnBarrier 方法。这两个方法的具体实现有一些细微的差异,但主要的逻辑是一致的,在 StreamTask.performCheckpoint() 方法中: 1)先向下游发送 barrier, 2)存储检查点快照。

一旦 StreamTask.triggerCheckpoint()StreamTask.triggerCheckpointOnBarrier() b被调用,就会通过 OperatorChain.broadcastCheckpointBarrier() 向下游发送 barrier:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class OperatorChain {
	public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
		//创建一个 CheckpointBarrier
		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
		//向所有的下游发送
		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
			streamOutput.broadcastEvent(barrier);
		}
	}
}

我们已经知道,每一个 Task 的通过 InputGate 消费上游 Task 产生的数据,而实际上在 StreamInputProcessorStreamTwoInputProcessor 中会创建 CheckpointBarrierHandler, CheckpointBarrierHandler 是对 InputGate 的一层封装,增加了对 CheckpointBarrier 等事件的处理。CheckpointBarrierHandler 有两个具体的实现,即 BarrierTrackerBarrierBuffer,分别对应 AT_LEAST_ONCE 和 EXACTLY_ONCE 这两种模式。

StreamInputProcessorStreamTwoInputProcessor 循环调用 CheckpointBarrierHandler.getNextNonBlocked() 获取新数据,因而在 CheckpointBarrierHandler 获得 CheckpointBarrier 后可以及时地进行 checkpoint 相关的操作。

我们先来看一下 AT_LEAST_ONCE 模式下的 BarrierTracker,它仅仅追踪从每一个 input channel 接收到的 barrier,当所有 input channel 的 barrier 都被接收时,就可以触发 checkpoint 了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class BarrierTracker implements CheckpointBarrierHandler {
	@Override
	public BufferOrEvent getNextNonBlocked() throws Exception {
		while (true) {
			Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
			if (!next.isPresent()) {
				// buffer or input exhausted
				return null;
			}

			BufferOrEvent bufferOrEvent = next.get();
			if (bufferOrEvent.isBuffer()) {
				return bufferOrEvent;
			}
			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
				// 接收到 CheckpointBarrier
				processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
			}
			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
				// 接收到 CancelCheckpointMarker
				processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
			}
			else {
				// some other event
				return bufferOrEvent;
			}
		}
	}

	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
		final long barrierId = receivedBarrier.getId();
		// fast path for single channel trackers
		if (totalNumberOfInputChannels == 1) {
			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
			return;
		}

		// find the checkpoint barrier in the queue of pending barriers
		CheckpointBarrierCount cbc = null;
		int pos = 0;
		for (CheckpointBarrierCount next : pendingCheckpoints) {
			if (next.checkpointId == barrierId) {
				cbc = next;
				break;
			}
			pos++;
		}

		if (cbc != null) {
			// add one to the count to that barrier and check for completion
			int numBarriersNew = cbc.incrementBarrierCount();
			if (numBarriersNew == totalNumberOfInputChannels) {
				// checkpoint can be triggered (or is aborted and all barriers have been seen)
				// first, remove this checkpoint and all all prior pending
				// checkpoints (which are now subsumed)
				// 在当前 barrierId 前面的所有未完成的 checkpoint 都可以丢弃了
				for (int i = 0; i <= pos; i++) {
					pendingCheckpoints.pollFirst();
				}

				// notify the listener
				if (!cbc.isAborted()) {
					//通知进行 checkpoint
					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
				}
			}
		}
		else {
			// first barrier for that checkpoint ID
			// add it only if it is newer than the latest checkpoint.
			// if it is not newer than the latest checkpoint ID, then there cannot be a
			// successful checkpoint for that ID anyways
			if (barrierId > latestPendingCheckpointID) {
				latestPendingCheckpointID = barrierId;
				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

				// make sure we do not track too many checkpoints
				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
					pendingCheckpoints.pollFirst();
				}
			}
		}
	}
}

而对于 EXACTLY_ONCE 模式下的 BarrierBuffer,它除了要追踪每一个 input channel 接收到的 barrier 之外,在接收到所有的 barrier 之前,先收到 barrier 的 channel 要进入阻塞状态。当然为了避免进入“反压”状态,BarrierBuffer 会继续接收数据,但会对接收到的数据进行缓存,直到所有的 barrier 都到达。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class BarrierBuffer implements CheckpointBarrierHandler {
	/** To utility to write blocked data to a file channel. */
	private final BufferBlocker bufferBlocker; //用于缓存被阻塞的channel接收的数据

	/**
	 * The sequence of buffers/events that has been unblocked and must now be consumed before
	 * requesting further data from the input gate.
	 */
	private BufferOrEventSequence currentBuffered; //当前缓存的数据

	@Override
	public BufferOrEvent getNextNonBlocked() throws Exception {
		while (true) {
			// process buffered BufferOrEvents before grabbing new ones
			// 先处理缓存的数据
			Optional<BufferOrEvent> next;
			if (currentBuffered == null) {
				next = inputGate.getNextBufferOrEvent();
			}
			else {
				next = Optional.ofNullable(currentBuffered.getNext());
				if (!next.isPresent()) {
					completeBufferedSequence();
					return getNextNonBlocked();
				}
			}

			if (!next.isPresent()) {
				if (!endOfStream) {
					// end of input stream. stream continues with the buffered data
					endOfStream = true;
					releaseBlocksAndResetBarriers();
					return getNextNonBlocked();
				}
				else {
					// final end of both input and buffered data
					return null;
				}
			}

			BufferOrEvent bufferOrEvent = next.get();
			if (isBlocked(bufferOrEvent.getChannelIndex())) {
				// 如果当前 channel 是 block 状态,先写入缓存
				// if the channel is blocked, we just store the BufferOrEvent
				bufferBlocker.add(bufferOrEvent);
				checkSizeLimit();
			}
			else if (bufferOrEvent.isBuffer()) {
				return bufferOrEvent;
			}
			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
				if (!endOfStream) {
					// process barriers only if there is a chance of the checkpoint completing
					processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
				}
			}
			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
				processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
			}
			else {
				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
					processEndOfPartition();
				}
				return bufferOrEvent;
			}
		}
	}

	.......
}

除了 CheckpointBarrier 消息以外,在 checkpoint 发生异常或取消 checkpoint 的时候,会向下游发送 CancelCheckpointMarker 消息。

存储检查点状态快照

在触发了 checkpoint 之后,对于一个 Task 而言,最重要的就是将当前 Task 中所有算子的状态快照(state snapshot)储存到外部存储系统的。外部存储系统可能是一个分布式文件系统,也可能是 JobManager 内存中。

StreamTask.performCheckpoint 方法中,开始进行 checkpoint 操作,这里主要分为三部分:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class StreamTask {
	private boolean performCheckpoint(
			CheckpointMetaData checkpointMetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics,
			boolean advanceToEndOfTime) throws Exception {
		final long checkpointId = checkpointMetaData.getCheckpointId();
		final boolean result;
		synchronized (lock) {
			if (isRunning) {
				if (checkpointOptions.getCheckpointType().isSynchronous()) {
					syncSavepointLatch.setCheckpointId(checkpointId);
					if (advanceToEndOfTime) {
						advanceToEndOfEventTime();
					}
				}

				// All of the following steps happen as an atomic step from the perspective of barriers and
				// records/watermarks/timers/callbacks.
				// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
				// checkpoint alignments

				// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
				//           The pre-barrier work should be nothing or minimal in the common case.
				operatorChain.prepareSnapshotPreBarrier(checkpointId);

				// Step (2): Send the checkpoint barrier downstream
				operatorChain.broadcastCheckpointBarrier(
						checkpointId,
						checkpointMetaData.getTimestamp(),
						checkpointOptions);

				// Step (3): Take the state snapshot. This should be largely asynchronous, to not
				//           impact progress of the streaming topology
				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
				result = true;
			}
			else {
				// we cannot perform our checkpoint - let the downstream operators know that they
				// should not wait for any input from this operator
				// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
				// yet be created
				final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
				Exception exception = null;
				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
					try {
						recordWriter.broadcastEvent(message);
					} catch (Exception e) {
						exception = ExceptionUtils.firstOrSuppressed(
							new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
							exception);
					}
				}
				if (exception != null) {
					throw exception;
				}
				result = false;
			}
		}
		if (isRunning && syncSavepointLatch.isSet()) {
			//保存 savepoint,等待 checkpoint 确认完成
			final boolean checkpointWasAcked =
					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
			if (checkpointWasAcked) {
				finishTask();
			}
		}
		return result;
	}
}

在介绍如何存储检查点快照之前,先简单了解一下和 checkpoint 存储相关的一些类。简单地来说,CheckpointStorage 是对状态存储系统的抽象,它有两个不同的实现,分别是 MemoryBackendCheckpointStorageFsCheckpointStorageMemoryBackendCheckpointStorage 会将所有算子的检查点状态存储在 JobManager 的内存中,通常不适合在生产环境中使用;而 FsCheckpointStorage 则会把所有算子的检查点状态持久化存储在文件系统中。CheckpointStorageLocation 是对检查点状态存储位置的一个抽象,它能够提供获取检查点输出流的方法,通过输出流将状态和元数据写入到存储系统中。输出流关闭时可以获得状态句柄(StateHandle),后面可以使用句柄重新读取写入的状态。

checkpoint storage

接着我们来看看进行快照操作的主要逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class StreamTask {
	private void checkpointState(
			CheckpointMetaData checkpointMetaData,
			CheckpointOptions checkpointOptions,
			CheckpointMetrics checkpointMetrics) throws Exception {

		//1. 解析得到 CheckpointStorageLocation
		CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
				checkpointMetaData.getCheckpointId(),
				checkpointOptions.getTargetLocation());

		//2. 将存储过程封装为 CheckpointingOperation,开始进行检查点存储操作
		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
			this,
			checkpointMetaData,
			checkpointOptions,
			storage,
			checkpointMetrics);
		checkpointingOperation.executeCheckpointing();
	}
}

每一个算子的快照被抽象为 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class OperatorSnapshotFutures {
	@Nonnull
	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
	@Nonnull
	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
}

检查点快照的过程被封装为 CheckpointingOperation,由于每一个 StreamTask 可能包含多个算子,因而内部使用一个 Map 维护 OperatorID -> OperatorSnapshotFutures 的关系。CheckpointingOperation 中,快照操作分为两个阶段,第一阶段是同步执行的,第二阶段是异步执行的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class StreamTask {
	private static final class CheckpointingOperation {
		//OperatorID -> OperatorSnapshotFutures
		private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

		//执行检查点快照
		public void executeCheckpointing() throws Exception {
			startSyncPartNano = System.nanoTime();

			try {
				//1. 同步执行的部分
				for (StreamOperator<?> op : allOperators) {
					checkpointStreamOperator(op);
				}

				//2. 异步执行的部分
				// checkpoint 可以配置成同步执行,也可以配置成异步执行的
				// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态
				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
					owner,
					operatorSnapshotsInProgress,
					checkpointMetaData,
					checkpointMetrics,
					startAsyncPartNano);
				owner.cancelables.registerCloseable(asyncCheckpointRunnable);
				owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
			} catch (Exception ex) {
				........
			}
		}

		@SuppressWarnings("deprecation")
		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
			if (null != op) {
				// 调用 StreamOperator.snapshotState 方法进行快照
				// 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完
				OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
						checkpointMetaData.getCheckpointId(),
						checkpointMetaData.getTimestamp(),
						checkpointOptions,
						storageLocation);
				operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
			}
		}
	}
}

在同步执行阶段,会依次调用每一个算子的 StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
interface StreamOperator<OUT> {
	/**
	 * Called to draw a state snapshot from the operator.
	 *
	 * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
	 * the runnable might already be finished.
	 *
	 * @throws Exception exception that happened during snapshotting.
	 */
	OperatorSnapshotFutures snapshotState(
		long checkpointId,
		long timestamp,
		CheckpointOptions checkpointOptions,
		CheckpointStreamFactory storageLocation) throws Exception;
}

public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, Serializable {
	@Override
	public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
			CheckpointStreamFactory factory) throws Exception {
		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
				checkpointId,
				timestamp,
				factory,
				keyGroupRange,
				getContainingTask().getCancelables())) {

			//对状态进行快照
			snapshotState(snapshotContext);
			//raw state,要在子类中自己实现 raw state 的快照写入
			//timer 是作为 raw keyed state 写入的
			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
			snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

			//写入 managed state 快照
			if (null != operatorStateBackend) {
				snapshotInProgress.setOperatorStateManagedFuture(
					operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}

			//写入 managed keyed state 快照
			if (null != keyedStateBackend) {
				snapshotInProgress.setKeyedStateManagedFuture(
					keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}
		} catch (Exception snapshotException) {
			try {
				snapshotInProgress.cancel();
			} catch (Exception e) {
				snapshotException.addSuppressed(e);
			}

			String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
				getOperatorName() + ".";

			if (!getContainingTask().isCanceled()) {
				LOG.info(snapshotFailMessage, snapshotException);
			}
			throw new Exception(snapshotFailMessage, snapshotException);
		}
		return snapshotInProgress;
	}

	/**
	 * Stream operators with state, which want to participate in a snapshot need to override this hook method.
	 *
	 * @param context context that provides information and means required for taking a snapshot
	 */
	public void snapshotState(StateSnapshotContext context) throws Exception {
		final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
		//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
		// 所有的 timer 都作为 raw keyed state 写入
		if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
			((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
			KeyedStateCheckpointOutputStream out;
			try {
				out = context.getRawKeyedOperatorStateOutput();
			} catch (Exception exception) {
				throw new Exception("Could not open raw keyed operator state stream for " +
					getOperatorName() + '.', exception);
			}
			try {
				KeyGroupsList allKeyGroups = out.getKeyGroupList();
				for (int keyGroupIdx : allKeyGroups) {
					out.startNewKeyGroup(keyGroupIdx);

					timeServiceManager.snapshotStateForKeyGroup(
						new DataOutputViewStreamWrapper(out), keyGroupIdx);
				}
			} catch (Exception exception) {
				throw new Exception("Could not write timer service of " + getOperatorName() +
					" to checkpoint state stream.", exception);
			} finally {
				try {
					out.close();
				} catch (Exception closeException) {
					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
						"might have prevented deleting some state data.", getOperatorName(), closeException);
				}
			}
		}
	}
}

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
		extends AbstractStreamOperator<OUT>
		implements OutputTypeConfigurable<OUT> {

	@Override
	public void snapshotState(StateSnapshotContext context) throws Exception {
		super.snapshotState(context); //先调用父类方法,写入timer
		//通过反射调用用户函数中的快照操作
		StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
	}
}

public final class StreamingFunctionUtils {
	public static void snapshotFunctionState(
			StateSnapshotContext context,
			OperatorStateBackend backend,
			Function userFunction) throws Exception {
		while (true) {
			if (trySnapshotFunctionState(context, backend, userFunction)) {
				break;
			}
			// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
			if (userFunction instanceof WrappingFunction) {
				userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
			} else {
				break;
			}
		}
	}

	private static boolean trySnapshotFunctionState(
			StateSnapshotContext context,
			OperatorStateBackend backend,
			Function userFunction) throws Exception {

		// 如果用户函数实现了 CheckpointedFunction 接口,调用 snapshotState 创建快照
		if (userFunction instanceof CheckpointedFunction) {
			((CheckpointedFunction) userFunction).snapshotState(context);
			return true;
		}

		// 如果用户函数实现了 ListCheckpointed
		if (userFunction instanceof ListCheckpointed) {
			//先调用 snapshotState 方法获取当前状态
			@SuppressWarnings("unchecked")
			List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
					snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
			//获取后端存储的状态的引用
			ListState<Serializable> listState = backend.
					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
			//清空当前后端存储的 ListState
			listState.clear();

			//将当前状态依次加入后端存储
			if (null != partitionableState) {
				try {
					for (Serializable statePartition : partitionableState) {
						listState.add(statePartition);
					}
				} catch (Exception e) {
					listState.clear();

					throw new Exception("Could not write partitionable state to operator " +
						"state backend.", e);
				}
			}
			return true;
		}
		return false;
	}
}

现在我们已经看到 checkpoint 操作是如何同用户自定义函数建立关联的了,接下来我们来看看由 Flink 托管的状态是如何写入存储系统的,即:

1
2
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 keyed state

首先来看看 operator state。DefaultOperatorStateBackend 将实际的工作交给 DefaultOperatorStateBackendSnapshotStrategy 完成。首先,会为对当前注册的所有 operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括: 1)打开输出流 2)写入状态元数据信息 3)写入状态 4)关闭输出流,获得状态句柄。如果不启用异步checkpoint模式,那么这个 FutureTask 在同步阶段就会立刻执行。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
	public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
		final long checkpointId,
		final long timestamp,
		@Nonnull final CheckpointStreamFactory streamFactory,
		@Nonnull final CheckpointOptions checkpointOptions) throws IOException {

		if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
			return DoneFuture.of(SnapshotResult.empty());
		}

		final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
			new HashMap<>(registeredOperatorStates.size());
		final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
			new HashMap<>(registeredBroadcastStates.size());

		//获得已注册的所有 list state 和 broadcast state 的深拷贝
		ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
		Thread.currentThread().setContextClassLoader(userClassLoader);
		try {
			// eagerly create deep copies of the list and the broadcast states (if any)
			// in the synchronous phase, so that we can use them in the async writing.

			if (!registeredOperatorStates.isEmpty()) {
				for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
					PartitionableListState<?> listState = entry.getValue();
					if (null != listState) {
						listState = listState.deepCopy();
					}
					registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
				}
			}

			if (!registeredBroadcastStates.isEmpty()) {
				for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
					BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
					if (null != broadcastState) {
						broadcastState = broadcastState.deepCopy();
					}
					registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
				}
			}
		} finally {
			Thread.currentThread().setContextClassLoader(snapshotClassLoader);
		}

		//将主要写入操作封装为一个异步的FutureTask
		AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
			new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

				@Override
				protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {

					// 创建状态输出流
					CheckpointStreamFactory.CheckpointStateOutputStream localOut =
						streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
					snapshotCloseableRegistry.registerCloseable(localOut);

					// 收集元数据
					// get the registered operator state infos ...
					List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
						new ArrayList<>(registeredOperatorStatesDeepCopies.size());

					for (Map.Entry<String, PartitionableListState<?>> entry :
						registeredOperatorStatesDeepCopies.entrySet()) {
						operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
					}

					// ... get the registered broadcast operator state infos ...
					List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
						new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

					for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
						registeredBroadcastStatesDeepCopies.entrySet()) {
						broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
					}

					// 写入元数据
					// ... write them all in the checkpoint stream ...
					DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

					OperatorBackendSerializationProxy backendSerializationProxy =
						new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

					backendSerializationProxy.write(dov);

					// ... and then go for the states ...

					// 写入状态
					// we put BOTH normal and broadcast state metadata here
					int initialMapCapacity =
						registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
					final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
						new HashMap<>(initialMapCapacity);

					for (Map.Entry<String, PartitionableListState<?>> entry :
						registeredOperatorStatesDeepCopies.entrySet()) {

						PartitionableListState<?> value = entry.getValue();
						long[] partitionOffsets = value.write(localOut);
						OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
						writtenStatesMetaData.put(
							entry.getKey(),
							new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
					}

					// ... and the broadcast states themselves ...
					for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
						registeredBroadcastStatesDeepCopies.entrySet()) {

						BackendWritableBroadcastState<?, ?> value = entry.getValue();
						long[] partitionOffsets = {value.write(localOut)};
						OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
						writtenStatesMetaData.put(
							entry.getKey(),
							new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
					}

					// ... and, finally, create the state handle.
					OperatorStateHandle retValue = null;

					if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {

						//关闭输出流,获得状态句柄,后面可以用这个句柄读取状态
						StreamStateHandle stateHandle = localOut.closeAndGetHandle();

						if (stateHandle != null) {
							retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
						}

						return SnapshotResult.of(retValue);
					} else {
						throw new IOException("Stream was already unregistered.");
					}
				}

				@Override
				protected void cleanupProvidedResources() {
					// nothing to do
				}

				@Override
				protected void logAsyncSnapshotComplete(long startTime) {
					if (asynchronousSnapshots) {
						logAsyncCompleted(streamFactory, startTime);
					}
				}
			};

		final FutureTask<SnapshotResult<OperatorStateHandle>> task =
			snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);

		//如果不是异步 checkpoint 那么在这里直接运行 FutureTask,即在同步阶段就完成了状态的写入
		if (!asynchronousSnapshots) {
			task.run();
		}
		return task;
	}
}

keyed state 写入的基本流程与此相似,但由于 keyed state 在存储时有多种实现,包括基于堆内存和 RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。另外,Flink 自 1.5.0 版本还引入了一个本地状态存储的优化,支持在 TaskManager 的本地保存一份 keyed state,试图优化状态恢复的速度和网络开销。

至此,我们介绍了快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为 AsyncCheckpointRunnable,主要的操作包括 1)执行同步阶段创建的 FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class StreamTask {
	protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
		@Override
		public void run() {
			FileSystemSafetyNet.initializeSafetyNetForThread();
			try {
				TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
					new TaskStateSnapshot(operatorSnapshotsInProgress.size());
				TaskStateSnapshot localTaskOperatorSubtaskStates =
					new TaskStateSnapshot(operatorSnapshotsInProgress.size());

				// 完成每一个 operator 的状态写入
				// 如果是同步 checkpoint,那么在此之前状态已经写入完成
				// 如果是异步 checkpoint,那么在这里才会写入状态
				for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
					OperatorID operatorID = entry.getKey();
					OperatorSnapshotFutures snapshotInProgress = entry.getValue();
					// finalize the async part of all by executing all snapshot runnables
					OperatorSnapshotFinalizer finalizedSnapshots =
						new OperatorSnapshotFinalizer(snapshotInProgress);

					jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
						operatorID,
						finalizedSnapshots.getJobManagerOwnedState());

					localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
						operatorID,
						finalizedSnapshots.getTaskLocalState());
				}

				final long asyncEndNanos = System.nanoTime();
				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

				checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

				if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
					CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
					//报告 snapshot 完成
					reportCompletedSnapshotStates(
						jobManagerTaskOperatorSubtaskStates,
						localTaskOperatorSubtaskStates,
						asyncDurationMillis);

				} else {
					LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
						owner.getName(),
						checkpointMetaData.getCheckpointId());
				}
			} catch (Exception e) {
				handleExecutionException(e);
			} finally {
				owner.cancelables.unregisterCloseable(this);
				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
			}
		}
	}

	private void reportCompletedSnapshotStates(
			TaskStateSnapshot acknowledgedTaskStateSnapshot,
			TaskStateSnapshot localTaskStateSnapshot,
			long asyncDurationMillis) {
			TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
			boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
			boolean hasLocalState = localTaskStateSnapshot.hasState();
			// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
			// to stateless tasks on restore. This enables simple job modifications that only concern
			// stateless without the need to assign them uids to match their (always empty) states.
			taskStateManager.reportTaskStateSnapshots(
				checkpointMetaData,
				checkpointMetrics,
				hasAckState ? acknowledgedTaskStateSnapshot : null,
				hasLocalState ? localTaskStateSnapshot : null);
		}
}

public class TaskStateManagerImpl implements TaskStateManager {
	@Override
	public void reportTaskStateSnapshots(
		@Nonnull CheckpointMetaData checkpointMetaData,
		@Nonnull CheckpointMetrics checkpointMetrics,
		@Nullable TaskStateSnapshot acknowledgedState,
		@Nullable TaskStateSnapshot localState) {

		long checkpointId = checkpointMetaData.getCheckpointId();

		localStateStore.storeLocalState(checkpointId, localState);

		//发送 ACK 响应给 CheckpointCoordinator
		checkpointResponder.acknowledgeCheckpoint(
			jobId,
			executionAttemptID,
			checkpointId,
			checkpointMetrics,
			acknowledgedState);
	}
}

本地状态存储

所谓本地状态存储,即在存储检查点快照时,在 Task 所在的 TaskManager 本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对 keyed state,我们以较为简单的 HeapKeyedStateBackend 为例,看看本地状态存储时如何实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class HeapSnapshotStrategy<K>
	extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
	@Nonnull
	@Override
	public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
		long checkpointId,
		long timestamp,
		@Nonnull CheckpointStreamFactory primaryStreamFactory,
		@Nonnull CheckpointOptions checkpointOptions) throws IOException {

		......

		//创建 CheckpointStreamWithResultProvider
		final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =

			localRecoveryConfig.isLocalRecoveryEnabled() ?

				() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
					checkpointId,
					CheckpointedStateScope.EXCLUSIVE,
					primaryStreamFactory,
					localRecoveryConfig.getLocalStateDirectoryProvider()) :

				() -> CheckpointStreamWithResultProvider.createSimpleStream(
					CheckpointedStateScope.EXCLUSIVE,
					primaryStreamFactory);

		........
	}
}

其中关键的一点在于,根据是否启用本地状态恢复创建不同的 CheckpointStreamWithResultProvider

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface CheckpointStreamWithResultProvider extends Closeable {
	@Nonnull
	static CheckpointStreamWithResultProvider createSimpleStream(
		@Nonnull CheckpointedStateScope checkpointedStateScope,
		@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {

		CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
			primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
		return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
	}

	@Nonnull
	static CheckpointStreamWithResultProvider createDuplicatingStream(
		@Nonnegative long checkpointId,
		@Nonnull CheckpointedStateScope checkpointedStateScope,
		@Nonnull CheckpointStreamFactory primaryStreamFactory,
		@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {

		CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
			primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);

		try {
			File outFile = new File(
				secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),
				String.valueOf(UUID.randomUUID()));
			Path outPath = new Path(outFile.toURI());

			CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
				new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
			//有两个输出流,primary 和 secondary,secondary 对应本地存储
			return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);
		} catch (IOException secondaryEx) {
			LOG.warn("Exception when opening secondary/local checkpoint output stream. " +
				"Continue only with the primary stream.", secondaryEx);
		}
		return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
	}
}

所以在启用本地状态存储的情况下,会创建两个输出流,其中 primaryOut 对应外部存储,而 secondaryOut 对应本地存储。状态会输出两份。本地状态句柄会存储在 TaskLocalStateStore 中。

对 Checkpoint 的确认

Task 对 checkpoint 的响应是通过 CheckpointResponder 接口完成的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public interface CheckpointResponder {

	/**
	 * Acknowledges the given checkpoint.
	 */
	void acknowledgeCheckpoint(
		JobID jobID,
		ExecutionAttemptID executionAttemptID,
		long checkpointId,
		CheckpointMetrics checkpointMetrics,
		TaskStateSnapshot subtaskState);

	/**
	 * Declines the given checkpoint.
	 */
	void declineCheckpoint(
		JobID jobID,
		ExecutionAttemptID executionAttemptID,
		long checkpointId,
		Throwable cause);
}

RpcCheckpointResponder 作为 CheckpointResponder 的具体实现,主要是通过 RPC 调用通知 CheckpointCoordinatorGateway,即通知给 JobMaster, JobMaster 调用 CheckpointCoordinator.receiveAcknowledgeMessage()CheckpointCoordinator.receiveDeclineMessage() 进行处理。

确认完成

在一个 Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 响应,对 Ack 响应的处理流程主要如下:

  • 根据 Ack 的 checkpointID 从 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找对应的 PendingCheckpoint
  • 若存在对应的 PendingCheckpoint
    • 这个 PendingCheckpoint 没有被丢弃,调用 PendingCheckpoint.acknowledgeTask 方法处理 Ack,根据处理结果的不同:
      • SUCCESS:判断是否已经接受了所有需要响应的 Ack,如果是,则调用 completePendingCheckpoint 完成此次 checkpoint
      • DUPLICATE:Ack 消息重复接收,直接忽略
      • UNKNOWN:未知的 Ack 消息,清理上报的 Ack 中携带的状态句柄
      • DISCARD:Checkpoint 已经被 discard,清理上报的 Ack 中携带的状态句柄
    • 这个 PendingCheckpoint 已经被丢弃,抛出异常
  • 若不存在对应的 PendingCheckpoint,则清理上报的 Ack 中携带的状态句柄

相应的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class CheckpointCoordinator {
	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
		if (shutdown || message == null) {
			return false;
		}

		if (!job.equals(message.getJob())) {
			LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
			return false;
		}

		final long checkpointId = message.getCheckpointId();

		synchronized (lock) {
			// we need to check inside the lock for being shutdown as well, otherwise we
			// get races and invalid error log messages
			if (shutdown) {
				return false;
			}

			final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

			if (checkpoint != null && !checkpoint.isDiscarded()) {

				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
					case SUCCESS:
						LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
							checkpointId, message.getTaskExecutionId(), message.getJob());

						if (checkpoint.isFullyAcknowledged()) {
							completePendingCheckpoint(checkpoint);
						}
						break;
					case DUPLICATE:
						LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
						break;
					case UNKNOWN:
						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
								"because the task's execution attempt id was unknown. Discarding " +
								"the state handle to avoid lingering state.", message.getCheckpointId(),
							message.getTaskExecutionId(), message.getJob());

						discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

						break;
					case DISCARDED:
						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
							"because the pending checkpoint had been discarded. Discarding the " +
								"state handle tp avoid lingering state.",
							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

						discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
				}

				return true;
			}
			else if (checkpoint != null) {
				// this should not happen
				throw new IllegalStateException(
						"Received message for discarded but non-removed checkpoint " + checkpointId);
			}
			else {
				boolean wasPendingCheckpoint;
				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
				if (recentPendingCheckpoints.contains(checkpointId)) {
					wasPendingCheckpoint = true;
					LOG.warn("Received late message for now expired checkpoint attempt {} from " +
						"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
				}
				else {
					LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
						checkpointId, message.getTaskExecutionId(), message.getJob());
					wasPendingCheckpoint = false;
				}

				// try to discard the state so that we don't have lingering state lying around
				discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

				return wasPendingCheckpoint;
			}
		}
	}
}

对于一个已经触发但还没有完成的 checkpoint,即 PendingCheckpoint,它是如何处理 Ack 消息的呢?在 PendingCheckpoint 内部维护了两个 Map,分别是:

  • Map<OperatorID, OperatorState> operatorStates; : 已经接收到 Ack 的算子的状态句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但还没有接收到的 Task

每当接收到一个 Ack 消息时,PendingCheckpoint 就从 notYetAcknowledgedTasks 中移除对应的 Task,并保存 Ack 携带的状态句柄保存。当 notYetAcknowledgedTasks 为空时,表明所有的 Ack 消息都接收到了。

其中 OperatorState 是算子状态句柄的一层封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class OperatorState implements CompositeStateHandle {
	/** handles to non-partitioned states, subtaskindex -> subtaskstate */
	private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
}

public class OperatorSubtaskState implements CompositeStateHandle {
	/** Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */
	@Nonnull
	private final StateObjectCollection<OperatorStateHandle> managedOperatorState;

	/** Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */
	@Nonnull
	private final StateObjectCollection<OperatorStateHandle> rawOperatorState;

	/** Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */
	@Nonnull
	private final StateObjectCollection<KeyedStateHandle> managedKeyedState;

	/** Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */
	@Nonnull
	private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
}

一旦 PendingCheckpoint 确认所有 Ack 消息都已经接收,那么就可以完成此次 checkpoint 了,具体包括:

  • 调用 PendingCheckpoint.finalizeCheckpoint()PendingCheckpoint 转化为 CompletedCheckpoint
    • 获取 CheckpointMetadataOutputStream,将所有的状态句柄信息通过 CheckpointMetadataOutputStream 写入到存储系统中
    • 创建一个 CompletedCheckpoint 对象
  • CompletedCheckpoint 保存到 CompletedCheckpointStore
    • CompletedCheckpointStore 有两种实现,分别为 StandaloneCompletedCheckpointStoreZooKeeperCompletedCheckpointStore
    • StandaloneCompletedCheckpointStore 简单地将 CompletedCheckpointStore 存放在一个数组中
    • ZooKeeperCompletedCheckpointStore 提供高可用实现:先将 CompletedCheckpointStore 写入到 RetrievableStateStorageHelper 中(通常是文件系统),然后将文件句柄存在 ZK 中
    • 保存的 CompletedCheckpointStore 数量是有限的,会删除旧的快照
  • 移除被越过的 PendingCheckpoint,因为 CheckpointID 是递增的,那么所有比当前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丢弃了
  • 依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
    • 通过 RPC 调用 TaskExecutor.confirmCheckpoint() 告知对应的 Task

拒绝

在 Task 进行 checkpoint 的过程,可能会发生异常导致 checkpoint 失败,在这种情况下会通过 CheckpointResponder 发出回绝的消息。当 CheckpointCoordinator 接收到 DeclineCheckpoint 消息后会移除 PendingCheckpoint,并尝试丢弃已经接收到的 Ack 消息中已完成的状态句柄:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class CheckpointCoordinator {
	public void receiveDeclineMessage(DeclineCheckpoint message) {
		if (shutdown || message == null) {
			return;
		}
		if (!job.equals(message.getJob())) {
			throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
				message.getJob() + " while this coordinator handles job " + job);
		}

		final long checkpointId = message.getCheckpointId();
		final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");

		PendingCheckpoint checkpoint;

		synchronized (lock) {
			// we need to check inside the lock for being shutdown as well, otherwise we
			// get races and invalid error log messages
			if (shutdown) {
				return;
			}

			checkpoint = pendingCheckpoints.remove(checkpointId);

			if (checkpoint != null && !checkpoint.isDiscarded()) {
				LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
				discardCheckpoint(checkpoint, message.getReason());
			}
			else if (checkpoint != null) {
				// this should not happen
				throw new IllegalStateException(
						"Received message for discarded but non-removed checkpoint " + checkpointId);
			}
			else if (LOG.isDebugEnabled()) {
				if (recentPendingCheckpoints.contains(checkpointId)) {
					// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
					LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
							checkpointId, job, reason);
				} else {
					// message is for an unknown checkpoint. might be so old that we don't even remember it any more
					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
							checkpointId, job, reason);
				}
			}
		}
	}
}

状态恢复

当 Flink 作业失败重启或者从指定 SavePoint 启动时,需要将整个作业恢复到上一次成功 checkpoint 的状态。这里主要分为两个阶段:

  • CheckpointCoordinator 加载最近一次成功的 CompletedCheckpoint,并将状态重新分配到不同的 ExecutionTask)中
  • Task 启动时进行状态初始化

状态分配

首先,JobMaster 在创建 ExecutionGraph 后会尝试恢复状态到最近一次成功的 checkpoint,或者加载 SavePoint,最终都会调用 CheckpointCoordinator.restoreLatestCheckpointedState() 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class CheckpointCoordinator {
	public boolean restoreLatestCheckpointedState(
			Map<JobVertexID, ExecutionJobVertex> tasks,
			boolean errorIfNoCheckpoint,
			boolean allowNonRestoredState) throws Exception {
		synchronized (lock) {
			......
			// Restore from the latest checkpoint
			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
			StateAssignmentOperation stateAssignmentOperation =
					new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
			stateAssignmentOperation.assignStates();
			........
		}
	}
}

状态的分配过程被封装在 StateAssignmentOperation 中。在状态恢复的过程中,假如任务的并行度发生变化,那么每个子任务的状态和先前必然是不一致的,这其中就涉及到状态的平均重新分配问题,关于状态分配的细节,可以参考 Flink 团队的博文 A Deep Dive into Rescalable State in Apache Flink,里面给出了 operator state 和 keyed state 重新分配的详细介绍。

最终,每个 Task 分配的状态被封装在 JobManagerTaskRestore 中,并通过 Execution.setInitialState() 关联到 Execution 中。JobManagerTaskRestore 回作为 TaskDeploymentDescriptor 的一个属性下发到 TaskExecutor 中。

Task 状态初始化

TaskDeploymentDescriptor 被提交给 TaskExecutor 之后,TaskExecutorTaskStateManager 用于管理当前 Task 的状态,TaskStateManager 对象会基于分配的 JobManagerTaskRestore 和本地状态存储 TaskLocalStateStore 进行创建:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TaskExecutor {
	@Override
	public CompletableFuture<Acknowledge> submitTask(
			TaskDeploymentDescriptor tdd,
			JobMasterId jobMasterId,
			Time timeout) {

		.......

		//本地状态存储
		final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
				jobId,
				tdd.getAllocationId(),
				taskInformation.getJobVertexId(),
				tdd.getSubtaskIndex());
		//由 JobManager 分配的用于恢复的状态
		final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
		//创建 TaskStateManager
		final TaskStateManager taskStateManager = new TaskStateManagerImpl(
				jobId,
				tdd.getExecutionAttemptId(),
				localStateStore,
				taskRestore,
				checkpointResponder);

		//创建并启动 Task
		......
	}
}

Task 启动后,StreamTask 会先调用 initializeState 方法,这样每一个算子都会调用 StreamOperator.initializeState() 进行状态的初始化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public abstract class AbstractStreamOperator<OUT>
		implements StreamOperator<OUT>, Serializable {
	@Override
	public final void initializeState() throws Exception {
		final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

		final StreamTask<?, ?> containingTask =
			Preconditions.checkNotNull(getContainingTask());
		final CloseableRegistry streamTaskCloseableRegistry =
			Preconditions.checkNotNull(containingTask.getCancelables());
		final StreamTaskStateInitializer streamTaskStateManager =
			Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

		//创建 StreamOperatorStateContext,这一步会进行状态的恢复,
		//这样 operatorStateBackend 和 keyedStateBackend 就可以恢复到到最后一次 checkpoint 的状态
		//timeServiceManager 也会恢复
		final StreamOperatorStateContext context =
			streamTaskStateManager.streamOperatorStateContext(
				getOperatorID(),
				getClass().getSimpleName(),
				this,
				keySerializer,
				streamTaskCloseableRegistry,
				metrics);

		this.operatorStateBackend = context.operatorStateBackend();
		this.keyedStateBackend = context.keyedStateBackend();

		if (keyedStateBackend != null) {
			this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
		}

		timeServiceManager = context.internalTimerServiceManager();

		CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
		CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

		try {
			//StateInitializationContext 对外暴露了 state backend,timer service manager 等,operator 可以借助它来进行状态初始化
			StateInitializationContext initializationContext = new StateInitializationContextImpl(
				context.isRestored(), // information whether we restore or start for the first time
				operatorStateBackend, // access to operator state backend
				keyedStateStore, // access to keyed state backend
				keyedStateInputs, // access to keyed state stream
				operatorStateInputs); // access to operator state stream

			//进行状态初始化,在子类中实现,比如调用 UDF 的状态初始化方法
			initializeState(initializationContext);
		} finally {
			closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
			closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
		}
	}

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
	}
}

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
		extends AbstractStreamOperator<OUT>
		implements OutputTypeConfigurable<OUT> {
		@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		//用户函数调用状态初始化方法
		StreamingFunctionUtils.restoreFunctionState(context, userFunction);
	}
}

状态恢复的关键操作在于通过 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext, 通过 StreamOperatorStateContext 可以获取 state backend,timer service manager 等:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public interface StreamOperatorStateContext {
	// Returns true, the states provided by this context are restored from a checkpoint/savepoint.
	boolean isRestored();

	// Returns the operator state backend for the stream operator.
	OperatorStateBackend operatorStateBackend();

	// Returns the keyed state backend for the stream operator. This method returns null for non-keyed operators.
	AbstractKeyedStateBackend<?> keyedStateBackend();

	// Returns the internal timer service manager for the stream operator. This method returns null for non-keyed operators.
	InternalTimeServiceManager<?> internalTimerServiceManager();

	// Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to this stream operator.
	CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs();

	// Returns an iterable to obtain input streams for previously stored keyed state partitions that are assigned tothis operator. This method returns null for non-keyed operators.
	CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs();
}

为了生成 StreamOperatorStateContext,首先要通过 TaskStateManager.prioritizedOperatorState() 方法获得每个 Operator 需要恢复的状态句柄;然后使用获得的状态句柄创建并还原 state backend 和 timer。这里引入了 PrioritizedOperatorSubtaskState, 它封装了多个备选的 OperatorSubtaskState (快照),这些快照相互之间是可以(部分)替换的,并按照优先级排序。列表中的最后一项是包含了这个子任务的所有状态,但是优先级最低。在进行状态恢复的时候,优先从高优先级的状态句柄中读取状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class PrioritizedOperatorSubtaskState {
	/** List of prioritized snapshot alternatives for managed operator state. */
	private final List<StateObjectCollection<OperatorStateHandle>> prioritizedManagedOperatorState;

	/** List of prioritized snapshot alternatives for raw operator state. */
	private final List<StateObjectCollection<OperatorStateHandle>> prioritizedRawOperatorState;

	/** List of prioritized snapshot alternatives for managed keyed state. */
	private final List<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState;

	/** List of prioritized snapshot alternatives for raw keyed state. */
	private final List<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState;

	public static class Builder {
		protected <T extends StateObject> List<StateObjectCollection<T>> resolvePrioritizedAlternatives(
			StateObjectCollection<T> jobManagerState,
			List<StateObjectCollection<T>> alternativesByPriority,
			BiFunction<T, T, Boolean> approveFun) {

			// Nothing to resolve if there are no alternatives, or the ground truth has already no state, or if we can
			// assume that a rescaling happened because we find more than one handle in the JM state (this is more a sanity
			// check).
			if (alternativesByPriority == null
				|| alternativesByPriority.isEmpty()
				|| !jobManagerState.hasState()
				|| jobManagerState.size() != 1) {

				return Collections.singletonList(jobManagerState);
			}

			// As we know size is == 1
			T reference = jobManagerState.iterator().next();
			// This will contain the end result, we initialize it with the potential max. size.
			List<StateObjectCollection<T>> approved =
				new ArrayList<>(1 + alternativesByPriority.size());
			for (StateObjectCollection<T> alternative : alternativesByPriority) {
				// We found an alternative to the JM state if it has state, we have a 1:1 relationship, and the
				// approve-function signaled true.
				if (alternative != null
					&& alternative.hasState()
					&& alternative.size() == 1
					&& BooleanUtils.isTrue(approveFun.apply(reference, alternative.iterator().next()))) {
					approved.add(alternative);
				}
			}

			// 从 JobManager 获取的状态作为最低优先级的备选
			// Of course we include the ground truth as last alternative.
			approved.add(jobManagerState);
			return Collections.unmodifiableList(approved);
		}
	}

}

public class TaskStateManagerImpl implements TaskStateManager {
	@Override
	public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
		if (jobManagerTaskRestore == null) {
			return PrioritizedOperatorSubtaskState.emptyNotRestored();
		}
		//从 JobManager 获取的状态快照
		TaskStateSnapshot jobManagerStateSnapshot =
			jobManagerTaskRestore.getTaskStateSnapshot();
		OperatorSubtaskState jobManagerSubtaskState =
			jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
		if (jobManagerSubtaskState == null) {
			return PrioritizedOperatorSubtaskState.emptyNotRestored();
		}

		//本地状态快照作为备选
		long restoreCheckpointId = jobManagerTaskRestore.getRestoreCheckpointId();
		TaskStateSnapshot localStateSnapshot =
			localStateStore.retrieveLocalState(restoreCheckpointId);

		localStateStore.pruneMatchingCheckpoints((long checkpointId) -> checkpointId != restoreCheckpointId);
		List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();

		if (localStateSnapshot != null) {
			OperatorSubtaskState localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
			if (localSubtaskState != null) {
				alternativesByPriority = Collections.singletonList(localSubtaskState);
			}
		}

		//构建 PrioritizedOperatorSubtaskState
		PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
			jobManagerSubtaskState,
			alternativesByPriority,
			true);

		return builder.build();
	}
}

在获得了 PrioritizedOperatorSubtaskState 之后就可以进行状态的恢复了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class StreamTaskStateInitializerImpl implements StreamTaskStateInitializer {
	@Override
	public StreamOperatorStateContext streamOperatorStateContext(
		@Nonnull OperatorID operatorID,
		@Nonnull String operatorClassName,
		@Nonnull KeyContext keyContext,
		@Nullable TypeSerializer<?> keySerializer,
		@Nonnull CloseableRegistry streamTaskCloseableRegistry,
		@Nonnull MetricGroup metricGroup) throws Exception {

		TaskInfo taskInfo = environment.getTaskInfo();
		OperatorSubtaskDescriptionText operatorSubtaskDescription =
			new OperatorSubtaskDescriptionText(
				operatorID,
				operatorClassName,
				taskInfo.getIndexOfThisSubtask(),
				taskInfo.getNumberOfParallelSubtasks());

		final String operatorIdentifierText = operatorSubtaskDescription.toString();

		//先获取用于恢复状态的 PrioritizedOperatorSubtaskState
		final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
			taskStateManager.prioritizedOperatorState(operatorID);

		AbstractKeyedStateBackend<?> keyedStatedBackend = null;
		OperatorStateBackend operatorStateBackend = null;
		CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
		CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
		InternalTimeServiceManager<?> timeServiceManager;

		try {
			// -------------- Keyed State Backend --------------
			keyedStatedBackend = keyedStatedBackend(
				keySerializer,
				operatorIdentifierText,
				prioritizedOperatorSubtaskStates,
				streamTaskCloseableRegistry,
				metricGroup);

			// -------------- Operator State Backend --------------
			operatorStateBackend = operatorStateBackend(
				operatorIdentifierText,
				prioritizedOperatorSubtaskStates,
				streamTaskCloseableRegistry);

			// -------------- Raw State Streams --------------
			rawKeyedStateInputs = rawKeyedStateInputs(
				prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
			streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

			rawOperatorStateInputs = rawOperatorStateInputs(
				prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
			streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

			// -------------- Internal Timer Service Manager --------------
			timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

			// -------------- Preparing return value --------------

			return new StreamOperatorStateContextImpl(
				prioritizedOperatorSubtaskStates.isRestored(),
				operatorStateBackend,
				keyedStatedBackend,
				timeServiceManager,
				rawOperatorStateInputs,
				rawKeyedStateInputs);
		} catch (Exception ex) {
			//.......
		}
	}
}

状态恢复和创建创建 state backend 耦合在一起,借助 BackendRestorerProcedure 来完成,具体的逻辑在 BackendRestorerProcedure.createAndRestore 方法中。

小结

Flink 的 checkpoint 机制是实现故障恢复和数据一致性的重要保障。本文首先对 Flink 的分布式快照的核心思想进行了概述,接着从源码的角度对 checkpoint 的发起、执行、以及状态恢复等流程进行了分析。

参考

-EOF-