Flink 1.10 对内部事件处理的线程模型做了一个大的改进,采用了类似 Actor 的信箱模型。这篇文章我们将深入 Flink 内部 Mailbox 线程模型的设计即实现。
背景
在之前的线程模型中,StreamTask
中可能存在多个潜在的线程会修改内部的状态,因此需要通过加锁的方式来确保线程安全的状态,这个全局的锁就是著名的 checkpointLock
。通过 checkpointLock
控制线程间的并发会让程序代码变得很复杂,并且锁对象还通过一些 API 暴露给了用户(例如 SourceFunction#getCheckpointLock()
),如果没有正确加锁很容易引发线程安全问题。
为了解决这个问题,社区提出了基于 Mailbox 的线程模型,见 FLINK-12477。Mailbox 机制借鉴了 Actor 模型,通过单个 Mailbox 线程配合阻塞队列的方式,将内部状态的修改交由单个线程完成,从而避免多线程的问题。相比于使用 checkpointLock
,Mailbox 模型另一个好处是方便控制事件处理的优先级,通过锁竞争很难达到类似的效果。
在原始的线程模型中,checkpointLock
主要用在三个地方:
- 事件处理:包括 events, watermarks, barriers, latency markers 的处理和发送
- checkpoint 触发:通过 RPC 调用触发 checkpoint(在 Source 中)、通知 checkpoint 的完成情况,(注:对下游来说,checkpoint 触发和取消是通过 barrier 触发的,归为第一种情况)
- Processing Time Timers: 处理时间定时器是通过
ScheduledExecutor
异步执行的(事件事件定时器触发是通过 watermark 触发的,归为第一种情况)
在新的改进方案中,对锁的替换不仅仅要做到排他的效果,对于事件处理还需要保证原子性。
改进方案
Mailbox 模型的核心思想其实比较简单,其底层就是 FIFO 的队列 + 一个单线程的循环事件处理。所有需要处理的事件都封装成一个 Mail 投递到 Mailbox 中,然后按先后顺序由单线程加以处理,从而简化了并发访问问题。
在使用 Mailbox 以前,StreamTask
的核心逻辑是在 StreamTask#run()
中,内部是一个循环的事件处理。除此以外,checkpoint trigger 和 processing time timer 在其它线程中运行。
在改进方案中,StreamTask
的基础逻辑大致如下(伪代码,来自设计文档):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
BlockingQueue<Runnable> mailbox = ...
void runMailboxProcessing() {
//TODO: can become a cancel-event through mailbox eventually
Runnable letter;
while (isRunning()) {
while ((letter = mailbox.poll()) != null) {
letter.run();
}
defaultAction();
}
}
void defaultAction() {
// e.g. event-processing from an input
}
|
上面只是核心代码的大致逻辑,具体的实现还有一些优化,比如队列的公平性。之前的抢锁操作是完全没有任何公平性而言的。
在这个模型下,事件处理的循环被移到了 Mailbox 处理线程中,因此以往在 StreamTask#run()
中的循环逻辑就不再需要了。但这里会有个问题,因为历史原因,Flink Source Function 的核心逻辑是一个循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。在 FLIP-27 提出的新的 Source 接口中,已经可以比较好地和 Mailbox 模型进行兼容了。
对于 checkpoint trigger 和 processing time timer,只需要将对应的操作封装为 Mail 投递到 Mailbox 中,等待 Mailbox 线程进行处理即可。
具体实现
整体设计
下面这张图展示了 Mailbox 线程模型中的核心抽象。
Mail
中封装了需要处理的消息和相应的动作,checkpoint trigger 和 processing time timer 就是通过 Mail
触发的;TaskMailbox
用于存储 Mail
(需要处理的消息);MailboxProcessor
负责从 TaskMailbox
中取出信件并处理;其它的调用方通过 MailboxExecutor
向 TaskMailbox
中投递信件。
MailboxDefaultAction
则是 MailboxProcessor
的默认动作,如前所述,MailboxDefaultAction
主要负责处理基础的 stream event、barrier、watermark 等。在 Mailbox 主线程的循环中,处理完新的 Mail
后就会执行该动作。MailboxDefaultAction
通过一个 MailboxController
和 Mailbox 进行交互,可以借此获悉所有的事件都处理完毕,或者临时暂停 MailboxDefaultAction
。
Mailbox
TaskMailbox
的内部使用了一个普通的 Deque
存储写入的 Mail
,对 Deque
读写通过一个 ReentrantLock
来加以保护。Mailbox
的一个主要特性是可以做优先级控制,每一个 Mail
都有其优先级,从 TaskMailbox
获取 Mail
时可以指定优先级,实际实现时就是通过遍历队列元素比较优先级。
为了减少读取队列时的同步开销,TaskMailbox
支持创建一个 batch 后续消费,相当于把队列中的元素存入一个额外的队列,后续消费时就避免了加锁的操作。
MailboxProcessor
MailboxProcessot 核心就是前面提过的事件循环,在这个事件循环中,除了处理 TaskMailbox
中的事件外,还有一个 MailboxDefaultAction
用做默认的行为。
MailboxDefaultAction
和 TaskMailbox
内部的 Mail
的区别在于,Mail
通常用于一些控制类的消息处理,例如 checkpoint 触发,而 MailboxDefaultAction
则用于数据流上的普通消息处理(如正常的数据记录,barrier)等。数据流上的消息数据量比较大,通过邮箱内部队列进行处理显然开销比较大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class MailboxProcessor implements Closeable {
//邮箱
protected final TaskMailbox mailbox;
// 默认行为,用于普通的数据流上的消息数据处理
protected final MailboxDefaultAction mailboxDefaultAction;
public void runMailboxLoop() throws Exception {
final TaskMailbox localMailbox = mailbox;
//确保当前调用必须发生在 Mailbox 的事件处理线程中
checkState(localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController defaultActionContext = new MailboxController(this);
while (isMailboxLoopRunning()) { //事件循环
// The blocking `processMail` call will not return until default action is available.
// 处理事件,这是一个阻塞方法,如果默认行为不可用,方法不会返回
processMail(localMailbox, false);
if (isMailboxLoopRunning()) { // 再做一次检查,因为上面的 mail 处理可能会改变运行状态
//执行默认行为
mailboxDefaultAction.runDefaultAction(defaultActionContext);
}
}
}
}
|
MailboxExecutor
MailboxExecutor
的主要作用是向 TaskMailbox
中投递 Mail
,这个接口被设计为类似 java.util.concurrent.Executor
接口。提交 Mail
的行为可以在任意线程中进行,因为 TaskMailbox
内部有基于锁的同步控制。
除了提交 Mail
外,MailboxExecutor
还有一个比较重要的作用体现在 MailboxExecutor#yield
方法中。yield
这个词在程序设计语言中非常常见,但其含义往往又让人摸不着头脑。从字面解释来看,yield
有“让出”,“屈服”之意,在一些场景下也有“生成”的意思。这里我们不纠结这个,还是来看看这个方法设计的意图的什么。
Mailbox 模型中所有的事件都是在单个事件处理线程中处理的,排除掉优先级的因素,所有的事件按照 FIFO 的顺序加以处理。正常情况下,这种处理顺序是没有问题的。但是考虑到一种特殊的情况,如果要完成对事件A的处理需要等待一个条件,只有在处理完事件B之后这个条件才能满足,但是事件B在队列里的顺序是在事件A之后的,这样某种程度上来说就造成了一种 “死锁”。
yield
方法就是为了解决上面的问题,yield
会从队列中取出下一个事件进行处理,看上去像是暂时“让出”了对当前事件的处理。
说起来有点抽象,看一个示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
MailboxExecutor mailboxExecutor = ....
mailboxExecutor.executr(() -> {
// ...
// 当前事件处理的逻辑,要完成,需要依赖后面某个事件的处理
while (resource not available) {
// 取出下一个事件处理
mailboxExecutor.yield();
}
// 继续处理当前事件
// ...
})
|
注意,为了不破坏 Mailbox
模型单线程执行的特性,这个方法必须在 Mailbox
事件处理线程中调用。这是一个阻塞方法,因此可能会阻塞事件处理线程。有些场景下可能还需要依赖事件处理线程来提交新的事件,因此也提供了非阻塞的 tryYield
方法。
StreamTask 如何应用 Mailbox 模型
StreamTask 的核心是处理消息流中的 StreamRecord
,这个处理逻辑是 MailboxProcessor
的默认行为,即:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
class StreamTask {
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
InputStatus status = inputProcessor.processInput(); //处理输入
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
// 没有后续的输入了,告知 MailboxDefaultAction.Controller
controller.allActionsCompleted();
return;
}
// 暂时没有输入的情况
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
TimerGauge timer;
CompletableFuture<?> resumeFuture;
if (!recordWriter.isAvailable()) {
timer = ioMetrics.getBackPressuredTimePerSecond();
resumeFuture = recordWriter.getAvailableFuture();
} else {
timer = ioMetrics.getIdleTimeMsPerSecond();
resumeFuture = inputProcessor.getAvailableFuture();
}
// 一旦有输入了,就告知 controller 要恢复 MailboxDefaultAction 的处理
assertNoException(
resumeFuture.thenRun(
// 首先会暂停 MailboxDefaultAction 的处理
new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
}
private static class ResumeWrapper implements Runnable {
private final Suspension suspendedDefaultAction;
private final TimerGauge timer;
public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) {
this.suspendedDefaultAction = suspendedDefaultAction;
timer.markStart();
this.timer = timer;
}
@Override
public void run() {
timer.markEnd();
suspendedDefaultAction.resume();
}
}
}
|
对于 checkpoint 的触发,是通过 MailboxExecutor
提交一个 Mail 来实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class StreamTask {
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
// 提交到 mailbox 中运行
mainMailboxExecutor.execute(
() -> {
......
try {
result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime));
} catch (Exception ex) {
// Report the failure both via the Future result but also to the mailbox
result.completeExceptionally(ex);
throw ex;
}
},
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
return result;
}
}
|
checkpoint 完成或者放弃的通知也是提交到 Mailbox 中运行的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
class StreamTask {
// checkpoint 完成或者失败的回调通知操作
private Future<Void> notifyCheckpointOperation(
RunnableWithException runnable, String description) {
CompletableFuture<Void> result = new CompletableFuture<>();
mailboxProcessor
.getMailboxExecutor(TaskMailbox.MAX_PRIORITY)
.execute(
() -> {
try {
runnable.run();
} catch (Exception ex) {
result.completeExceptionally(ex);
throw ex;
}
result.complete(null);
},
description);
return result;
}
}
|
对于 processing time timer 的触发也是类似的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class streamTask {
public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
return mailboxExecutor ->
new ProcessingTimeServiceImpl(
timerService,
callback -> deferCallbackToMailbox(mailboxExecutor, callback));
}
ProcessingTimeCallback deferCallbackToMailbox(
MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {
return timestamp -> {
// 提交到 mailbox 中运行
mailboxExecutor.execute(
() -> invokeProcessingTimeCallback(callback, timestamp),
"Timer callback for %s @ %d",
callback,
timestamp);
};
}
}
|
Legacy Source 的兼容处理
前面提到,因为历史遗留的问题,SourceFunction
被设计成一个无限的循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。
SourceStreamTask
被设计为 StreamTask
的子类,会启动另外一个独立的线程 LegacySourceFunctionThread
运行 SourceFunction
中的循环。这样相当于有两个线程在同时运行,一个是 SourceFunction
中生成数据流中的数据,另一个是 Mailbox
中的事件处理线程。为了防止这两个线程发生冲突,在 SourceStreamTask
中保留了 checkpoint lock,用于在这两个线程间进行并发控制。
为了达到这样的效果,Flink 提供了一个 StreamTaskActionExecutor
的封装,用来运行 Runnable
。正常情况下,StreamTaskActionExecutor 的实现就是直接去运行 Runnable
;同时也提供了一个 SynchronizedStreamTaskActionExecutor
的实现,在运行 Runnable
的时候会进行加锁控制,这样就把获取锁的操作引入到 Mailbox 处理线程中了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {
private final Object mutex;
public SynchronizedStreamTaskActionExecutor(Object mutex) {
this.mutex = mutex;
}
@Override
public void run(RunnableWithException runnable) throws Exception {
synchronized (mutex) {
runnable.run();
}
}
}
|
小结
Mailbox 模型是常见的用来控制并发的一种设计,通过引入 Mailbox 的线程模型,Flink 简化了 StreamTask
的代码逻辑,规避了多线程竞争带来的并发问题。
通过对 Mailbox
、 MailboxProcessor
、MailboxExecutor
这几个接口的设计进行分析,可以看出 Flink 的 Mailbox 模型设计还是比较优雅的,在使用方面也比较简单,很值得我们在开发其它项目的时候参考。
参考