在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。
在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。
HA 及 Leader 选举
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
Leader 地址的获取通过 LeaderRetrievalLister
和 LeaderRetriverService
这两个接口来完成。 LeaderRetriverService
可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。
1
2
3
4
5
6
7
8
9
10
|
public interface LeaderRetrievalService {
void start(LeaderRetrievalListener listener) throws Exception;
void stop() throws Exception;
}
public interface LeaderRetrievalListener {
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
void handleError(Exception exception);
}
|
GatewayRetriver
接口用于获取 RpcGateway
,抽象类 LeaderGatewayRetriver
则同时继承了 LeaderRetriever
和 GatewayRetriver
,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。
RpcGatewayRetriever
是 LeaderGatewayRetriver
的具体实现,根据 Leader 的地址通过 RpcService.connect()
方法获得对应 Leader 的 RpcGateway。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class RpcGatewayRetriever<F extends Serializable, T extends FencedRpcGateway<F>> extends LeaderGatewayRetriever<T> {
@Override
protected CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
return FutureUtils.retryWithDelay(
() ->
leaderFuture.thenCompose(
(Tuple2<String, UUID> addressLeaderTuple) ->
rpcService.connect(
addressLeaderTuple.f0,
fencingTokenMapper.apply(addressLeaderTuple.f1),
gatewayType)),
retries,
retryDelay,
rpcService.getScheduledExecutor());
}
}
|
Leader 选举是通过 LeaderElectionService
(选举服务)和 LeaderContender
(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 LeaderContender
竞选成功了,会通过 LeaderContender#grantLeadership
得到通知。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public interface LeaderElectionService {
void start(LeaderContender contender) throws Exception;
void stop() throws Exception;
void confirmLeaderSessionID(UUID leaderSessionID);
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
public interface LeaderContender {
void grantLeadership(UUID leaderSessionID);
void revokeLeadership();
String getAddress();
void handleError(Exception exception);
}
|
LeaderElectionService
有多种实现,如无需进行选举过程的 StandaloneLeaderElectionService
,以及借助 zookeeper 和 curator 框架实现的 ZooKeeperLeaderElectionService
,具体的实现细节可参考对应的源码。
HighAvailabilityServices
接口则提供了获取 HA 相关所有服务的方法,包括:
- ResourceManager 选举服务及 Leader 获取
- Dispatcher 选举服务及 Leader 获取
- 任务状态的注册表
- checkpoint recovery、blob store 等相关的服务
MiniCluster 的启动流程
我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 LocalStreamEnvironment
中。
在 MiniCluster#start
中,启动流程大致分为三个阶段:
- 创建一些辅助的服务,如
RpcService
, HighAvailabilityServices
, BlobServer
等
- 启动 TaskManager
- 启动 Dispatcher, ResourceManager 等
创建 HighAvailabilityServices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class MiniCluster {
public void start() {
//......
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"));
haServices = createHighAvailabilityServices(configuration, ioExecutor);
//......
}
protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
LOG.info("Starting high-availability services");
return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
executor);
}
}
|
HighAvailabilityServicesUtils
是创建 HighAvailabilityServices
的工具类,在没有配置 HA 的情况下,会创建 EmbeddedHaServices
。 EmbeddedHaServices
不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。EmbeddedHaService
为各组件创建的选举服务为 EmbeddedLeaderElectionService
, 一旦有参与选举的 LeaderContender
加入,该 contender 就被选择为 leader。
启动 TaskManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class MiniCluster {
public void start() {
//......
startTaskManagers();
//......
}
private void startTaskManagers() throws Exception {
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
for (int i = 0; i < numTaskManagers; i++) {
startTaskExecutor();
}
}
@VisibleForTesting
void startTaskExecutor() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
}
|
在创建 HighAvailabilityServices
之后,就可以依次启动 TaskManager 了。TaskManagerRunner#startTaskManager
会创建一个 TaskExecutor
, TaskExecutor
实现了 RpcEndpoint
接口。 TaskExecutor
需要和 ResourceManager
等组件进行通信,可以通过 HighAvailabilityServices
获得对应的服务地址。
在 TaskExecutor
启动的回调函数中,会启动一系列服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public void onStart() throws Exception {
try {
//启动服务
startTaskExecutorServices();
} catch (Exception e) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
//超时交由 FatalErrorHandler 进行处理
startRegistrationTimeout();
}
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
/**
* The listener for leader changes of the resource manager.
*/
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
//获得 ResourceManager 的地址, 和 ResourceManager 建立连接
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(Exception exception) {
onFatalError(exception);
}
}
private final class JobLeaderListenerImpl implements JobLeaderListener {
@Override
public void jobManagerGainedLeadership(
final JobID jobId,
final JobMasterGateway jobManagerGateway,
final JMTMRegistrationSuccess registrationMessage) {
//和 JobManager 建立连接
runAsync(
() ->
establishJobManagerConnection(
jobId,
jobManagerGateway,
registrationMessage));
}
@Override
public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
runAsync(() ->
closeJobManagerConnection(
jobId,
new Exception("Job leader for job id " + jobId + " lost leadership.")));
}
@Override
public void handleError(Throwable throwable) {
onFatalError(throwable);
}
}
}
|
当 ResourceManagerLeaderListener
的监听被回调时,TaskExecutor
会试图建立和 ResourceManager
的连接,连接被封装为 TaskExecutorToResourceManagerConnection
。一旦获取 ResourceManager
的 leader 被确定后,就可以获取到 ResourceManager
对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 ResourceManager#registerTaskExecutor
注册流程。注册成功后,TaskExecutor
向 ResourceManager
报告其资源(主要是 slots)情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class TaskExecutor {
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
//发送SlotReport
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
//......
//连接建立
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
stopRegistrationTimeout();
}
}
|
启动 DispatcherResourceManagerComponent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
class MiniCluster {
public void start() {
//......
dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagreComponentRpcServiceFactory,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
metricQueryServiceRetriever,
new ShutDownFatalErrorHandler()
));
//......
}
protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents(
Configuration configuration,
RpcServiceFactory rpcServiceFactory,
HighAvailabilityServices haServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
//Session dispatcher, standalone resource manager
SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory();
return Collections.singleton(
dispatcherResourceManagerComponentFactory.create(
configuration,
rpcServiceFactory.createRpcService(),
haServices,
blobServer,
heartbeatServices,
metricRegistry,
new MemoryArchivedExecutionGraphStore(),
metricQueryServiceRetriever,
fatalErrorHandler));
}
@Nonnull
private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
}
}
|
在 MiniCluster 模式下,会创建一个 SessionDispatcherResourceManagerComponent
对象。SessionDispatcherResourceManagerComponent
继承自 DispatcherResourceManagerComponent
,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 StandaloneDispatcher
和 StandaloneResourceManager
。
在工厂类创建 DispatcherResourceManagerComponent
, 会启动 Dispatcher, ResourceManager 等组件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
@Override
public DispatcherResourceManagerComponent<T> create(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
//.......
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
jobManagerMetricGroup);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
dispatcher = dispatcherFactory.createDispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist);
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
log.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return createDispatcherResourceManagerComponent(
dispatcher,
resourceManager,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
jobManagerMetricGroup);
}
}
|
在 ResourceManager
启动的回调函数中,会通过 HighAvailabilityServices
获取到选举服务,从而参与到选举之中。并启动 JobLeaderIdService
,管理向当前 ResourceManager 注册的作业的 leader id。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway, LeaderContender {
@Override
public void onStart() throws Exception {
try {
startResourceManagerServices();
} catch (Exception e) {
final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
//参与选举
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerSlotAndTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
}
|
在 Dispatcher
启动的回调函数中,当前 Dispatcher 也会通过 LeaderElectionService
参与选举。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements
DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener {
//resource manager 的 gateway retriever,可以和 resource manager 通信
private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
@Override
public void onStart() throws Exception {
try {
startDispatcherServices();
} catch (Exception e) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
}
private void startDispatcherServices() throws Exception {
try {
submittedJobGraphStore.start(this);
leaderElectionService.start(this);
registerDispatcherMetrics(jobManagerMetricGroup);
} catch (Exception e) {
handleStartDispatcherServicesException(e);
}
}
}
|
提交 JobGraph
通过 MiniCluster#executeJobBlocking
提交 JobGraph
并等待运行完成,提交JobGraph
和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class MiniCluster {
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
//通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
//通过 RPC 调用向 Dispatcher 提交 JobGraph
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT));
}
}
|
Dispatcher
在接收到提交 JobGraph
的请求后,会将提交的 JobGraph
保存在 SubmittedJobGraphStore
中(用于故障恢复),并为提交的 JobGraph
启动 JobManager:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
class Dispatcher {
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
//创建 JobManagerRunner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());
return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
//noinspection ObjectEquality
if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
if (strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
} else {
jobMasterFailed(jobId, strippedThrowable);
}
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}
}, getMainThreadExecutor());
//启动JobManager
jobManagerRunner.start();
return jobManagerRunner;
}
}
|
启动的 JobManagerRunner
会竞争 leader ,一旦被选举为 leader,就会启动一个 JobMaster
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync {
public void start() throws Exception {
try {
//竞争leader
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could not start the JobManager because the leader election service did not start.", e);
throw new Exception("Could not start the leader election service.", e);
}
}
//被选举为 leader
@Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.info("JobManagerRunner already shutdown.");
return;
}
leadershipOperation = leadershipOperation.thenCompose(
(ignored) -> {
synchronized (lock) {
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
//使用特定的 JobMasterId 启动 JobMaster
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
executor);
}
}
|
JobMaster
启动后会和 ResourceManager
建立连接,连接被封装为 ResourceManagerConnection
。一旦连接建立之后,JobMaster
就可以通过 RPC 调用和 ResourceManager
进行通信了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
private void startJobMasterServices() throws Exception {
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
scheduler.start(getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
}
|
在此之后就进入了任务调度执行的流程。
Standalone Cluster 模式的启动流程
在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。
JobManager 的启动
需要注意的一点是,这里我们所说的 JobManager 指的是包含 Dispatcher
, ResouceManager
等组件的单一进程,而并非 Dispatcher
为执行 JobGraph
而启动的 JobManagerRunner
。在 FLIP-6 的实现中,每个 JobGraph
的调度执行的实际上是由一个独立的 JobMaster
负责的。
standalonesession 方式启动的 JobManager 的入口类是 StandaloneSessionClusterEntrypoint
, 继承自 SessionClusterEntrypoint
;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 StandaloneJobClusterEntryPoint
,继承自 JobClusterEntrypoint
。它们都由公共父类 ClusterEntrypoint
派生而来,区别在于生成的 DispatcherResourceManagerComponent
不同。
先来看下启动过程,实际上和 MiniCluster 模式下启动 DispatcherResourceManagerComponent
的过程类似:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
abstract class ClusterEntrypoint {
private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
//初始化 RpcService, HighAvailabilityServices 等服务
initializeServices(configuration);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
//生成 DispatcherResourceManagerComponentFactory,由具体子类实现
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
//一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
}
|
这里生成的 HighAvailabilityServices
和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 ZooKeeperHaServices
,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 StandaloneHaServices
, 并从配置文件中获取各组件的 RPC 地址信息。
在 StandaloneSessionClusterEntrypoint
中,生成 DispatcherResourceManagerComponent
的工厂类是 SessionDispatcherResourceManagerComponentFactory
,该工厂类创建 SessionDispatcherResourceManagerComponent
:由 SessionDispatcherFactory
创建 StandaloneDispatcher
, 由 StandaloneResourceManagerFactory
创建 StandaloneResourceManager
。
在 StandaloneJobClusterEntrypoint
中,生成 DispatcherResourceManagerComponent
的工厂类是 JobDispatcherResourceManagerComponentFactory
,该厂类创建 JobDispatcherResourceManagerComponent
:由 StandaloneResourceManagerFactory
创建 StandaloneResourceManager
,由 JobDispatcherFactory
创建 MiniDispatcher
。一个 MiniDispatcher
和一个 JobGraph
相绑定,一旦绑定的 JobGraph
执行结束,则关闭 MiniDispatcher
,进而停止 JobManager 进程。
Dispatcher
和 ResourceManager
服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。
TaskManager 的启动
TaskManager 的启动入口在 TaskManagerRunner
中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)HighAvailabilityServices
的创建要依赖配置文件获取。 TaskManagerRunner
会创建 TaskExecutor
,TaskExecutor
通过 HighAvailabilityServices
获取 ResourceManager
的通信地址,并和 ResourceManager
建立连接。
Yarn Cluster 的启动流程
Yarn Cluster 的启动入口在 FlinkYarnSessionCli
中 :首先根据命令行参数创建 YarnClusterDescriptor
,接着调用 YarnClusterDescriptor#deploySessionCluster
触发集群的部署。
实际启动的逻辑在 AbstractYarnClusterDescriptor#deployInternal
中,主要就是通过 YarnClient
向 yarn 集群提交应用,启动 ApplicationMaster:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
abstract class AbstractYarnClusterDescriptor {
protected ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
//.......
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
//.....
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
}
}
|
根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 YarnSessionClusterEntrypoint
和 YarnJobClusterEntrypoint
, 区别在于 Dispatcher 分别为 StandaloneDispatcher
和 MiniDispatcher
。ResoureManager
的具体实现类为 YarnResourceManager
。
和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 TaskManager
是由 YarnResourceManager
根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 YarnResourceManager
会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
class YarnResourceManager {
//申请container
private void requestYarnContainer() {
resourceManagerClient.addContainerRequest(getContainerRequest());
// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
numPendingContainerRequests++;
log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
resource,
numPendingContainerRequests);
}
//分配container的回调函数
@Override
public void onContainersAllocated(List<Container> containers) {
runAsync(() -> {
final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
for (Container container : containers) {
log.info(
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);
if (numPendingContainerRequests > 0) {
removeContainerRequest(pendingRequestsIterator.next());
final String containerIdStr = container.getId().toString();
final ResourceID resourceId = new ResourceID(containerIdStr);
workerNodeMap.put(resourceId, new YarnWorkerNode(container));
try {
// Context information used to start a TaskExecutor Java process
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,
container.getNodeId().getHost());
//启动 TaskManager
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start TaskManager in container {}.", container.getId(), t);
// release the failed container
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(container.getId());
// and ask for a new one
requestYarnContainerIfRequired();
}
} else {
// return the excessive containers
log.info("Returning excess container {}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
}
}
// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (numPendingContainerRequests <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
//创建 TaskManager 的启动上下文
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
throws Exception {
// init the ContainerLaunchContext
final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
"JVM direct memory limit {} MB",
containerId,
taskManagerParameters.taskManagerTotalMemoryMB(),
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
log.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig,
yarnConfig,
env,
taskManagerParameters,
taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class, //入口类
log);
// set a special environment variable to uniquely identify this container
taskExecutorLaunchContext.getEnvironment()
.put(ENV_FLINK_CONTAINER_ID, containerId);
taskExecutorLaunchContext.getEnvironment()
.put(ENV_FLINK_NODE_ID, host);
return taskExecutorLaunchContext;
}
}
|
小结
本文简单分析了 Flink 集群的启动流程,以及 ResourceManager
、 TaskExecutor
Dispatcher
、 JobMaster
等不同组件之间的通信过程。
参考
-EOF-