在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。

在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。

HA 及 Leader 选举

Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。

Leader 地址的获取通过 LeaderRetrievalListerLeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface LeaderRetrievalService {
	void start(LeaderRetrievalListener listener) throws Exception;
	void stop() throws Exception;
}


public interface LeaderRetrievalListener {
	void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
	void handleError(Exception exception);
}

GatewayRetriver 接口用于获取 RpcGateway,抽象类 LeaderGatewayRetriver 则同时继承了 LeaderRetrieverGatewayRetriver,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。

RpcGatewayRetrieverLeaderGatewayRetriver 的具体实现,根据 Leader 的地址通过 RpcService.connect() 方法获得对应 Leader 的 RpcGateway。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>> extends LeaderGatewayRetriever<T> {
	@Override
	protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
		return FutureUtils.retryWithDelay(
			() ->
				leaderFuture.thenCompose(
					(Tuple2<String, UUID> addressLeaderTuple) ->
						rpcService.connect(
							addressLeaderTuple.f0,
							fencingTokenMapper.apply(addressLeaderTuple.f1),
							gatewayType)),
			retries,
			retryDelay,
			rpcService.getScheduledExecutor());
	}
}

Leader 选举是通过 LeaderElectionService(选举服务)和 LeaderContender(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender 竞选成功了,会通过 LeaderContender#grantLeadership 得到通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public interface LeaderElectionService {
	void start(LeaderContender contender) throws Exception;

	void stop() throws Exception;

	void confirmLeaderSessionID(UUID leaderSessionID);

	boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

public interface LeaderContender {
	void grantLeadership(UUID leaderSessionID);

	void revokeLeadership();

	String getAddress();

	void handleError(Exception exception);
}

LeaderElectionService 有多种实现,如无需进行选举过程的 StandaloneLeaderElectionService,以及借助 zookeeper 和 curator 框架实现的 ZooKeeperLeaderElectionService,具体的实现细节可参考对应的源码。

HighAvailabilityServices 接口则提供了获取 HA 相关所有服务的方法,包括:

  • ResourceManager 选举服务及 Leader 获取
  • Dispatcher 选举服务及 Leader 获取
  • 任务状态的注册表
  • checkpoint recovery、blob store 等相关的服务

MiniCluster 的启动流程

我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 看一看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 LocalStreamEnvironment 中。

MiniCluster#start 中,启动流程大致分为三个阶段:

  • 创建一些辅助的服务,如 RpcServiceHighAvailabilityServices, BlobServer
  • 启动 TaskManager
  • 启动 Dispatcher, ResourceManager 等

创建 HighAvailabilityServices

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class MiniCluster {
	public void start() {
		//......
		ioExecutor = Executors.newFixedThreadPool(
					Hardware.getNumberCPUCores(),
					new ExecutorThreadFactory("mini-cluster-io"));
		haServices = createHighAvailabilityServices(configuration, ioExecutor);
		//......
	}
	
	protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
		LOG.info("Starting high-availability services");
		return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
			configuration,
			executor);
	}
}

HighAvailabilityServicesUtils 是创建 HighAvailabilityServices 的工具类,在没有配置 HA 的情况下,会创建 EmbeddedHaServicesEmbeddedHaServices 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。EmbeddedHaService 为各组件创建的选举服务为 EmbeddedLeaderElectionService, 一旦有参与选举的 LeaderContender 加入,该 contender 就被选择为 leader。

启动 TaskManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MiniCluster {
	public void start() {
		//......
		startTaskManagers();
		//......
	}

	private void startTaskManagers() throws Exception {
		final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
		for (int i = 0; i < numTaskManagers; i++) {
			startTaskExecutor();
		}
	}

	@VisibleForTesting
	void startTaskExecutor() throws Exception {
		synchronized (lock) {
			final Configuration configuration = miniClusterConfiguration.getConfiguration();

			final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
				configuration,
				new ResourceID(UUID.randomUUID().toString()),
				taskManagerRpcServiceFactory.createRpcService(),
				haServices,
				heartbeatServices,
				metricRegistry,
				blobCacheService,
				useLocalCommunication(),
				taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

			taskExecutor.start();
			taskManagers.add(taskExecutor);
		}
	}
}

在创建 HighAvailabilityServices 之后,就可以依次启动 TaskManager 了。TaskManagerRunner#startTaskManager 会创建一个 TaskExecutor, TaskExecutor 实现了 RpcEndpoint 接口。 TaskExecutor 需要和 ResourceManager 等组件进行通信,可以通过 HighAvailabilityServices 获得对应的服务地址。

TaskExecutor 启动的回调函数中,会启动一系列服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	public void onStart() throws Exception {
		try {
			//启动服务
			startTaskExecutorServices();
		} catch (Exception e) {
			final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
		//超时交由 FatalErrorHandler 进行处理
		startRegistrationTimeout();
	}

	private void startTaskExecutorServices() throws Exception {
		try {
			// start by connecting to the ResourceManager
			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

			// tell the task slot table who's responsible for the task slot actions
			taskSlotTable.start(new SlotActionsImpl());

			// start the job leader service
			jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

			fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
		} catch (Exception e) {
			handleStartTaskExecutorServicesException(e);
		}
	}
	
	/**
	 * The listener for leader changes of the resource manager.
	 */
	private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

		@Override
		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
			//获得 ResourceManager 的地址, 和 ResourceManager 建立连接
			runAsync(
				() -> notifyOfNewResourceManagerLeader(
					leaderAddress,
					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
		}

		@Override
		public void handleError(Exception exception) {
			onFatalError(exception);
		}
	}
	
	private final class JobLeaderListenerImpl implements JobLeaderListener {

		@Override
		public void jobManagerGainedLeadership(
			final JobID jobId,
			final JobMasterGateway jobManagerGateway,
			final JMTMRegistrationSuccess registrationMessage) {
			//和 JobManager 建立连接
			runAsync(
				() ->
					establishJobManagerConnection(
						jobId,
						jobManagerGateway,
						registrationMessage));
		}

		@Override
		public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
			log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);

			runAsync(() ->
				closeJobManagerConnection(
					jobId,
					new Exception("Job leader for job id " + jobId + " lost leadership.")));
		}

		@Override
		public void handleError(Throwable throwable) {
			onFatalError(throwable);
		}
	}
}

ResourceManagerLeaderListener 的监听被回调时,TaskExecutor 会试图建立和 ResourceManager 的连接,连接被封装为 TaskExecutorToResourceManagerConnection。一旦获取 ResourceManager 的 leader 被确定后,就可以获取到 ResourceManager 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 ResourceManager#registerTaskExecutor 注册流程。注册成功后,TaskExecutorResourceManager 报告其资源(主要是 slots)情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TaskExecutor {
	private void establishResourceManagerConnection(
			ResourceManagerGateway resourceManagerGateway,
			ResourceID resourceManagerResourceId,
			InstanceID taskExecutorRegistrationId,
			ClusterInformation clusterInformation) {
        //发送SlotReport
		final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
			getResourceID(),
			taskExecutorRegistrationId,
			taskSlotTable.createSlotReport(getResourceID()),
			taskManagerConfiguration.getTimeout());
		//......
		//连接建立
		establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
			resourceManagerGateway,
			resourceManagerResourceId,
			taskExecutorRegistrationId);

		stopRegistrationTimeout();
	}
}

启动 DispatcherResourceManagerComponent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class MiniCluster {
	public void start() {
		//......
		dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
					configuration,
					dispatcherResourceManagreComponentRpcServiceFactory,
					haServices,
					blobServer,
					heartbeatServices,
					metricRegistry,
					metricQueryServiceRetriever,
					new ShutDownFatalErrorHandler()
				));
		//......
	}

	protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
			Configuration configuration,
			RpcServiceFactory rpcServiceFactory,
			HighAvailabilityServices haServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {
	   //Session dispatcher, standalone resource manager
		SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory();
		return Collections.singleton(
			dispatcherResourceManagerComponentFactory.create(
				configuration,
				rpcServiceFactory.createRpcService(),
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				new MemoryArchivedExecutionGraphStore(),
				metricQueryServiceRetriever,
				fatalErrorHandler));
	}

	@Nonnull
	private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
		return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
	}
}

在 MiniCluster 模式下,会创建一个 SessionDispatcherResourceManagerComponent 对象。SessionDispatcherResourceManagerComponent 继承自 DispatcherResourceManagerComponent,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 StandaloneDispatcherStandaloneResourceManager

在工厂类创建 DispatcherResourceManagerComponent, 会启动 Dispatcher, ResourceManager 等组件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
	@Override
	public DispatcherResourceManagerComponent<T> create(
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			ArchivedExecutionGraphStore archivedExecutionGraphStore,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {
		//.......

		webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getWebMonitorLeaderElectionService(),
				fatalErrorHandler);

		log.debug("Starting Dispatcher REST endpoint.");
		webMonitorEndpoint.start();
	
		resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				metricRegistry,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				jobManagerMetricGroup);

		final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

		dispatcher = dispatcherFactory.createDispatcher(
				configuration,
				rpcService,
				highAvailabilityServices,
				resourceManagerGatewayRetriever,
				blobServer,
				heartbeatServices,
				jobManagerMetricGroup,
				metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
				archivedExecutionGraphStore,
				fatalErrorHandler,
				historyServerArchivist);

		log.debug("Starting ResourceManager.");
		resourceManager.start();
		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

		log.debug("Starting Dispatcher.");
		dispatcher.start();
		dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

		return createDispatcherResourceManagerComponent(
				dispatcher,
				resourceManager,
				dispatcherLeaderRetrievalService,
				resourceManagerRetrievalService,
				webMonitorEndpoint,
				jobManagerMetricGroup);
	}
}

ResourceManager 启动的回调函数中,会通过 HighAvailabilityServices 获取到选举服务,从而参与到选举之中。并启动 JobLeaderIdService,管理向当前 ResourceManager 注册的作业的 leader id。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
		@Override
	public void onStart() throws Exception {
		try {
			startResourceManagerServices();
		} catch (Exception e) {
			final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
	}

	private void startResourceManagerServices() throws Exception {
		try {
			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

			initialize();
			//参与选举
			leaderElectionService.start(this);
			jobLeaderIdService.start(new JobLeaderIdActionsImpl());

			registerSlotAndTaskExecutorMetrics();
		} catch (Exception e) {
			handleStartResourceManagerServicesException(e);
		}
	}
}

Dispatcher 启动的回调函数中,当前 Dispatcher 也会通过 LeaderElectionService 参与选举。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
	DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
	//resource manager 的 gateway retriever,可以和 resource manager 通信
	private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
	
	@Override
	public void onStart() throws Exception {
		try {
			startDispatcherServices();
		} catch (Exception e) {
			final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
			onFatalError(exception);
			throw exception;
		}
	}

	private void startDispatcherServices() throws Exception {
		try {
			submittedJobGraphStore.start(this);
			leaderElectionService.start(this);

			registerDispatcherMetrics(jobManagerMetricGroup);
		} catch (Exception e) {
			handleStartDispatcherServicesException(e);
		}
	}
}

提交 JobGraph

通过 MiniCluster#executeJobBlocking 提交 JobGraph 并等待运行完成,提交JobGraph和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MiniCluster {
    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
	//通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway
		final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();

		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
		// from the ResourceManager
		jobGraph.setAllowQueuedScheduling(true);

		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);

		final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);

		//通过 RPC 调用向 Dispatcher 提交 JobGraph
		final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
			.thenCombine(
				dispatcherGatewayFuture,
				(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
			.thenCompose(Function.identity());

		return acknowledgeCompletableFuture.thenApply(
			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
	}

	public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
		return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT));
	}
}

Dispatcher 在接收到提交 JobGraph 的请求后,会将提交的 JobGraph 保存在 SubmittedJobGraphStore 中(用于故障恢复),并为提交的 JobGraph 启动 JobManager:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Dispatcher {
    private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
		final RpcService rpcService = getRpcService();

		//创建 JobManagerRunner
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
			CheckedSupplier.unchecked(() ->
				jobManagerRunnerFactory.createJobManagerRunner(
					jobGraph,
					configuration,
					rpcService,
					highAvailabilityServices,
					heartbeatServices,
					jobManagerSharedServices,
					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
					fatalErrorHandler)),
			rpcService.getExecutor());

		return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
	}

	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
		jobManagerRunner.getResultFuture().whenCompleteAsync(
			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
				// check if we are still the active JobManagerRunner by checking the identity
				//noinspection ObjectEquality
				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
					if (archivedExecutionGraph != null) {
						jobReachedGloballyTerminalState(archivedExecutionGraph);
					} else {
						final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

						if (strippedThrowable instanceof JobNotFinishedException) {
							jobNotFinished(jobId);
						} else {
							jobMasterFailed(jobId, strippedThrowable);
						}
					}
				} else {
					log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
				}
			}, getMainThreadExecutor());

		//启动JobManager
		jobManagerRunner.start();

		return jobManagerRunner;
	}
}

启动的 JobManagerRunner 会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync {
	public void start() throws Exception {
		try {
			//竞争leader
			leaderElectionService.start(this);
		} catch (Exception e) {
			log.error("Could not start the JobManager because the leader election service did not start.", e);
			throw new Exception("Could not start the leader election service.", e);
		}
	}
	
	//被选举为 leader
	@Override
	public void grantLeadership(final UUID leaderSessionID) {
		synchronized (lock) {
			if (shutdown) {
				log.info("JobManagerRunner already shutdown.");
				return;
			}

			leadershipOperation = leadershipOperation.thenCompose(
				(ignored) -> {
					synchronized (lock) {
						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
					}
				});

			handleException(leadershipOperation, "Could not start the job manager.");
		}
	}

	private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

		return jobSchedulingStatusFuture.thenCompose(
			jobSchedulingStatus -> {
				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
					return jobAlreadyDone();
				} else {
					return startJobMaster(leaderSessionId);
				}
			});
	}

	private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

		try {
			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
		} catch (IOException e) {
			return FutureUtils.completedExceptionally(
				new FlinkException(
					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
					e));
		}

		final CompletableFuture<Acknowledge> startFuture;
		try {
		  //使用特定的 JobMasterId 启动 JobMaster
			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
		} catch (Exception e) {
			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
		}

		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
		return startFuture.thenAcceptAsync(
			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
			executor);
	}
}

JobMaster 启动后会和 ResourceManager 建立连接,连接被封装为 ResourceManagerConnection。一旦连接建立之后,JobMaster 就可以通过 RPC 调用和 ResourceManager 进行通信了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    private void startJobMasterServices() throws Exception {
		// start the slot pool make sure the slot pool now accepts messages for this leader
		slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
		scheduler.start(getMainThreadExecutor());

		//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
		// try to reconnect to previously known leader
		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

		// job is ready to go, try to establish connection with resource manager
		//   - activate leader retrieval for the resource manager
		//   - on notification of the leader, the connection will be established and
		//     the slot pool will start requesting slots
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
	}
}

在此之后就进入了任务调度执行的流程。

Standalone Cluster 模式的启动流程

在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。

JobManager 的启动

需要注意的一点是,这里我们所说的 JobManager 指的是包含 DispatcherResouceManager 等组件的单一进程,而并非 Dispatcher 为执行 JobGraph 而启动的 JobManagerRunner。在 FLIP-6 的实现中,每个 JobGraph 的调度执行的实际上是由一个独立的 JobMaster 负责的。

standalonesession 方式启动的 JobManager 的入口类是 StandaloneSessionClusterEntrypoint, 继承自 SessionClusterEntrypoint;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 StandaloneJobClusterEntryPoint,继承自 JobClusterEntrypoint。它们都由公共父类 ClusterEntrypoint 派生而来,区别在于生成的 DispatcherResourceManagerComponent 不同。

先来看下启动过程,实际上和 MiniCluster 模式下启动 DispatcherResourceManagerComponent 的过程类似:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
abstract class ClusterEntrypoint {
    private void runCluster(Configuration configuration) throws Exception {
		synchronized (lock) {
		
			//初始化 RpcService, HighAvailabilityServices  等服务
			initializeServices(configuration);

			// write host information into configuration
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			//生成 DispatcherResourceManagerComponentFactory,由具体子类实现
			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			//创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this);

			//一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
			clusterComponent.getShutDownFuture().whenComplete(
				(ApplicationStatus applicationStatus, Throwable throwable) -> {
					if (throwable != null) {
						shutDownAsync(
							ApplicationStatus.UNKNOWN,
							ExceptionUtils.stringifyException(throwable),
							false);
					} else {
						// This is the general shutdown path. If a separate more specific shutdown was
						// already triggered, this will do nothing
						shutDownAsync(
							applicationStatus,
							null,
							true);
					}
				});
		}
	}

}

这里生成的 HighAvailabilityServices 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 ZooKeeperHaServices,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 StandaloneHaServices, 并从配置文件中获取各组件的 RPC 地址信息。

StandaloneSessionClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 SessionDispatcherResourceManagerComponentFactory,该工厂类创建 SessionDispatcherResourceManagerComponent:由 SessionDispatcherFactory 创建 StandaloneDispatcher, 由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager

StandaloneJobClusterEntrypoint 中,生成 DispatcherResourceManagerComponent 的工厂类是 JobDispatcherResourceManagerComponentFactory,该厂类创建 JobDispatcherResourceManagerComponent:由 StandaloneResourceManagerFactory 创建 StandaloneResourceManager,由 JobDispatcherFactory 创建 MiniDispatcher。一个 MiniDispatcher 和一个 JobGraph 相绑定,一旦绑定的 JobGraph 执行结束,则关闭 MiniDispatcher,进而停止 JobManager 进程。

DispatcherResourceManager 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。

TaskManager 的启动

TaskManager 的启动入口在 TaskManagerRunner 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)HighAvailabilityServices 的创建要依赖配置文件获取。 TaskManagerRunner 会创建 TaskExecutorTaskExecutor 通过 HighAvailabilityServices 获取 ResourceManager 的通信地址,并和 ResourceManager 建立连接。

Yarn Cluster 的启动流程

Yarn Cluster 的启动入口在 FlinkYarnSessionCli 中 :首先根据命令行参数创建 YarnClusterDescriptor,接着调用 YarnClusterDescriptor#deploySessionCluster 触发集群的部署。

实际启动的逻辑在 AbstractYarnClusterDescriptor#deployInternal 中,主要就是通过 YarnClient 向 yarn 集群提交应用,启动 ApplicationMaster:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
abstract class AbstractYarnClusterDescriptor {
    protected ClusterClient<ApplicationId> deployInternal(
			ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached) throws Exception {
		//.......
	
		ApplicationReport report = startAppMaster(
			flinkConfiguration,
			applicationName,
			yarnClusterEntrypoint,
			jobGraph,
			yarnClient,
			yarnApplication,
			validClusterSpecification);
			
		//.....

		return createYarnClusterClient(
			this,
			validClusterSpecification.getNumberTaskManagers(),
			validClusterSpecification.getSlotsPerTaskManager(),
			report,
			flinkConfiguration,
			true);
	}
}

根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 YarnSessionClusterEntrypointYarnJobClusterEntrypoint, 区别在于 Dispatcher 分别为 StandaloneDispatcherMiniDispatcherResoureManager 的具体实现类为 YarnResourceManager

和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 TaskManager 是由 YarnResourceManager 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class YarnResourceManager {
	//申请container
	private void requestYarnContainer() {
		resourceManagerClient.addContainerRequest(getContainerRequest());

		// make sure we transmit the request fast and receive fast news of granted allocations
		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);

		numPendingContainerRequests++;

		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
			resource,
			numPendingContainerRequests);
	}
	
	//分配container的回调函数
	@Override
	public void onContainersAllocated(List<Container> containers) {
		runAsync(() -> {
			final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
			final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

			for (Container container : containers) {
				log.info(
					"Received new container: {} - Remaining pending container requests: {}",
					container.getId(),
					numPendingContainerRequests);

				if (numPendingContainerRequests > 0) {
					removeContainerRequest(pendingRequestsIterator.next());

					final String containerIdStr = container.getId().toString();
					final ResourceID resourceId = new ResourceID(containerIdStr);

					workerNodeMap.put(resourceId, new YarnWorkerNode(container));

					try {
						// Context information used to start a TaskExecutor Java process
						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
							container.getResource(),
							containerIdStr,
							container.getNodeId().getHost());
                        //启动 TaskManager
						nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
					} catch (Throwable t) {
						log.error("Could not start TaskManager in container {}.", container.getId(), t);

						// release the failed container
						workerNodeMap.remove(resourceId);
						resourceManagerClient.releaseAssignedContainer(container.getId());
						// and ask for a new one
						requestYarnContainerIfRequired();
					}
				} else {
					// return the excessive containers
					log.info("Returning excess container {}.", container.getId());
					resourceManagerClient.releaseAssignedContainer(container.getId());
				}
			}

			// if we are waiting for no further containers, we can go to the
			// regular heartbeat interval
			if (numPendingContainerRequests <= 0) {
				resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
			}
		});
	}
	
	//创建 TaskManager 的启动上下文
	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
			throws Exception {
		// init the ContainerLaunchContext
		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());

		final ContaineredTaskManagerParameters taskManagerParameters =
				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);

		log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
				"JVM direct memory limit {} MB",
				containerId,
				taskManagerParameters.taskManagerTotalMemoryMB(),
				taskManagerParameters.taskManagerHeapSizeMB(),
				taskManagerParameters.taskManagerDirectMemoryLimitMB());

		Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);

		log.debug("TaskManager configuration: {}", taskManagerConfig);

		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
			flinkConfig,
			yarnConfig,
			env,
			taskManagerParameters,
			taskManagerConfig,
			currDir,
			YarnTaskExecutorRunner.class, //入口类
			log);

		// set a special environment variable to uniquely identify this container
		taskExecutorLaunchContext.getEnvironment()
				.put(ENV_FLINK_CONTAINER_ID, containerId);
		taskExecutorLaunchContext.getEnvironment()
				.put(ENV_FLINK_NODE_ID, host);
		return taskExecutorLaunchContext;
	}
}

小结

本文简单分析了 Flink 集群的启动流程,以及 ResourceManagerTaskExecutor DispatcherJobMaster 等不同组件之间的通信过程。

参考

-EOF-