在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。
Async I/O 的使用方式
在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
// issue the asynchronous request, receive a future for the result
// 发起异步请求,返回结果是一个 Future
val resultFutureRequested: Future[String] = client.query(str)
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 请求完成时的回调,将结果交给 ResultFuture
resultFutureRequested.onSuccess {
case result: String => resultFuture.complete(Iterable((str, result)))
}
}
}
// create the original stream
val stream: DataStream[String] = ...
// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
|
AsyncDataStream
提供了两种调用方法,分别是 orderedWait
和 unorderedWait
,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
Async I/O 的实现
AsyncDataStream
在运行时被转换为 AsyncWaitOperator
算子,它是 AbstractUdfStreamOperator
的子类。下面我们来看看 AsyncWaitOperator
的实现原理。
基本原理
AsyncWaitOperator
算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator
内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue
提供了一种队列的抽象,一个“消费者”线程 Emitter
从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue;
/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
private transient ExecutorService executor;
/** Emitter for the completed stream element queue entries. */
private transient Emitter<OUT> emitter;
/** Thread running the emitter. */
private transient Thread emitterThread;
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
//注册一个定时器,在超时时调用 timeout 方法
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
//加入队列
addAsyncBufferEntry(streamRecordBufferEntry);
//发送异步请求
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
pendingStreamElementQueueEntry = streamElementQueueEntry;
while (!queue.tryPut(streamElementQueueEntry)) {
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
}
pendingStreamElementQueueEntry = null;
}
}
public class Emitter<OUT> implements Runnable {
@Override
public void run() {
try {
while (running) {
//从队列阻塞地获取元素
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
output(streamElementEntry);
}
}
}
}
|
AsyncWaitOperator
可以工作在两种模式下,即 ORDERED
和 UNORDERED
。Flink 通过 StreamElementQueue
的不同实现实现了这两种模式。
“有序”模式
在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue
的具体是实现是 OrderedStreamElementQueue
。OrderedStreamElementQueue
的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
public class OrderedStreamElementQueue implements StreamElementQueue {
/** Capacity of this queue. */
private final int capacity;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
@Override
public AsyncResult peekBlockingly() throws InterruptedException {
lock.lockInterruptibly();
try {
//只有队列头部的请求完成后才解除阻塞状态
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
notFull.signalAll();
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (queue.size() < capacity) { //未达容量上限
addEntry(streamElementQueueEntry);
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
}
|
“无序”模式
在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue
中巧妙地实现了这两种情况。
从上图中可以看出,在 UnorderedStreamElementQueue
内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue
中保存未完成的异步请求计算结果,而 completedQueue
中保存已完成的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue
这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet
作为一种特殊的集合,其内部只有一个元素,即 Watermark
,充当了不同散列集合之间的分界。这样就保证了在一个 Watermark
之后的异步请求的计算结果不会先于该 Watermark
之前进行提交。firstSet
中完成异步请求的计算结果会被转移到 completedQueue
队列中,firstSet
内部的所有异步请求的计算结果都是可以乱序提交的。
如果不使用“事件时间”,那么没有 Watermark
产生,所有的异步请求都会进入 firstSet
中,因而所有的结果都是乱序提交的。
具体代码实现逻辑如下,结合上面的示意图应该不难理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
public class UnorderedStreamElementQueue implements StreamElementQueue {
/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;
// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
private Set<StreamElementQueueEntry<?>> lastSet;
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (numberEntries < capacity) {
addEntry(streamElementQueueEntry);
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
//等待 completedQueue 中的元素
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}
numberEntries--;
notFull.signalAll();
return completedQueue.poll();
} finally {
lock.unlock();
}
}
//异步请求完成的回调
public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
//如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);
while (firstSet.isEmpty() && firstSet != lastSet) {
//如果firset中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet
firstSet = uncompletedQueue.poll();
Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();
if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());
if (streamElementQueueEntry.isWatermark()) {
//如果是watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中
lastSet = new HashSet<>(capacity);
if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
//正常记录,加入lastSet中
lastSet.add(streamElementQueueEntry);
}
//设置异步请求完成后的回调
streamElementQueueEntry.onComplete(
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
} catch (Throwable t) {
operatorActions.failOperator(new Exception("Could not complete the " +
"stream element queue entry: " + value + '.', t));
}
},
executor);
numberEntries++;
}
}
|
容错
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void open() throws Exception {
super.open();
//......
// 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() +
" encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
//先清除状态
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
//将所有未完成处理请求对应的消息加入状态中
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " +
"backend of operator " + getOperatorName() + '.', e);
}
}
}
|
小结
在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。
参考
-EOF-