在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。

Async I/O 的使用方式

在 Flink 中使用 Async I/O 的需要有一个支持异步请求的客户端。以官方文档给出的说明为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        // 发起异步请求,返回结果是一个 Future
        val resultFutureRequested: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        // 请求完成时的回调,将结果交给 ResultFuture
        resultFutureRequested.onSuccess {
            case result: String => resultFuture.complete(Iterable((str, result)))
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

AsyncDataStream 提供了两种调用方法,分别是 orderedWaitunorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。

由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。

Async I/O 的实现

AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。下面我们来看看 AsyncWaitOperator 的实现原理。

基本原理

AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator 内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。

async-io

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class AsyncWaitOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
	/** Queue to store the currently in-flight stream elements into. */
	private transient StreamElementQueue queue;

	/** Pending stream element which could not yet added to the queue. */
	private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

	private transient ExecutorService executor;

	/** Emitter for the completed stream element queue entries. */
	private transient Emitter<OUT> emitter;

	/** Thread running the emitter. */
	private transient Thread emitterThread;

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
		//注册一个定时器,在超时时调用 timeout 方法
		if (timeout > 0L) {
			// register a timeout for this AsyncStreamRecordBufferEntry
			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
			final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
				timeoutTimestamp,
				new ProcessingTimeCallback() {
					@Override
					public void onProcessingTime(long timestamp) throws Exception {
						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
					}
				});
			// Cancel the timer once we've completed the stream record buffer entry. This will remove
			// the register trigger task
			streamRecordBufferEntry.onComplete(
				(StreamElementQueueEntry<Collection<OUT>> value) -> {
					timerFuture.cancel(true);
				},
				executor);
		}
		//加入队列
		addAsyncBufferEntry(streamRecordBufferEntry);
		//发送异步请求
		userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
	}

	//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
	private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		assert(Thread.holdsLock(checkpointingLock));
		pendingStreamElementQueueEntry = streamElementQueueEntry;
		while (!queue.tryPut(streamElementQueueEntry)) {
			// we wait for the emitter to notify us if the queue has space left again
			checkpointingLock.wait();
		}
		pendingStreamElementQueueEntry = null;
	}

}

public class Emitter<OUT> implements Runnable {
	@Override
	public void run() {
		try {
			while (running) {
				//从队列阻塞地获取元素
				AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
				output(streamElementEntry);
			}
		}
	}
}

AsyncWaitOperator 可以工作在两种模式下,即 ORDEREDUNORDERED。Flink 通过 StreamElementQueue 的不同实现实现了这两种模式。

“有序”模式

在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueueOrderedStreamElementQueue 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。

OrderedStreamElementQueue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class OrderedStreamElementQueue implements StreamElementQueue {
	/** Capacity of this queue. */
	private final int capacity;

	/** Queue for the inserted StreamElementQueueEntries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> queue;

	@Override
	public AsyncResult peekBlockingly() throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//只有队列头部的请求完成后才解除阻塞状态
			while (queue.isEmpty() || !queue.peek().isDone()) {
				headIsCompleted.await();
			}
			return queue.peek();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();

		try {
			while (queue.isEmpty() || !queue.peek().isDone()) { 
				headIsCompleted.await();
			}
			notFull.signalAll();
			return queue.poll();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();

		try {
			if (queue.size() < capacity) { //未达容量上限
				addEntry(streamElementQueueEntry);
				return true;
			} else {
				return false;
			}
		} finally {
			lock.unlock();
		}
	}
}

“无序”模式

在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue 中巧妙地实现了这两种情况。

UnorderedStreamElementQueue

从上图中可以看出,在 UnorderedStreamElementQueue 内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 中保存未完成的异步请求计算结果,而 completedQueue 中保存已完成的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet 作为一种特殊的集合,其内部只有一个元素,即 Watermark,充当了不同散列集合之间的分界。这样就保证了在一个 Watermark 之后的异步请求的计算结果不会先于该 Watermark 之前进行提交。firstSet 中完成异步请求的计算结果会被转移到 completedQueue 队列中,firstSet 内部的所有异步请求的计算结果都是可以乱序提交的。

如果不使用“事件时间”,那么没有 Watermark 产生,所有的异步请求都会进入 firstSet 中,因而所有的结果都是乱序提交的。

具体代码实现逻辑如下,结合上面的示意图应该不难理解。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
public class UnorderedStreamElementQueue implements StreamElementQueue {
	/** Queue of uncompleted stream element queue entries segmented by watermarks. */
	private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

	/** Queue of completed stream element queue entries. */
	private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

	/** First (chronologically oldest) uncompleted set of stream element queue entries. */
	private Set<StreamElementQueueEntry<?>> firstSet;

	// Last (chronologically youngest) uncompleted set of stream element queue entries. New
	// stream element queue entries are inserted into this set.
	private Set<StreamElementQueueEntry<?>> lastSet;

	@Override
	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();
		try {
			if (numberEntries < capacity) {
				addEntry(streamElementQueueEntry);
				return true;
			} else {
				return false;
			}
		} finally {
			lock.unlock();
		}
	}

	@Override
	public AsyncResult poll() throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//等待 completedQueue 中的元素
			while (completedQueue.isEmpty()) {
				hasCompletedEntries.await();
			}
			numberEntries--;
			notFull.signalAll();
			return completedQueue.poll();
		} finally {
			lock.unlock();
		}
	}

	//异步请求完成的回调
	public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
		lock.lockInterruptibly();
		try {
			//如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中
			if (firstSet.remove(streamElementQueueEntry)) {
				completedQueue.offer(streamElementQueueEntry);
				while (firstSet.isEmpty() && firstSet != lastSet) {
					//如果firset中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet
					firstSet = uncompletedQueue.poll();
					Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
					while (it.hasNext()) {
						StreamElementQueueEntry<?> bufferEntry = it.next();

						if (bufferEntry.isDone()) {
							completedQueue.offer(bufferEntry);
							it.remove();
						}
					}
				}
				hasCompletedEntries.signalAll();
			}
		} finally {
			lock.unlock();
		}
	}

	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
		assert(lock.isHeldByCurrentThread());

		if (streamElementQueueEntry.isWatermark()) {
			//如果是watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中
			lastSet = new HashSet<>(capacity);
			if (firstSet.isEmpty()) {
				firstSet.add(streamElementQueueEntry);
			} else {
				Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
				watermarkSet.add(streamElementQueueEntry);
				uncompletedQueue.offer(watermarkSet);
			}
			uncompletedQueue.offer(lastSet);
		} else {
			//正常记录,加入lastSet中
			lastSet.add(streamElementQueueEntry);
		}

		//设置异步请求完成后的回调
		streamElementQueueEntry.onComplete(
			(StreamElementQueueEntry<T> value) -> {
				try {
					onCompleteHandler(value);
				} catch (InterruptedException e) {
				} catch (Throwable t) {
					operatorActions.failOperator(new Exception("Could not complete the " +
						"stream element queue entry: " + value + '.', t));
				}
			},
			executor);

		numberEntries++;
	}
}

容错

在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class AsyncWaitOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
	/** Recovered input stream elements. */
	private transient ListState<StreamElement> recoveredStreamElements;

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		recoveredStreamElements = context
			.getOperatorStateStore()
			.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

	}

	@Override
	public void open() throws Exception {
		super.open();
		
		//......

		// 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍
		if (recoveredStreamElements != null) {
			for (StreamElement element : recoveredStreamElements.get()) {
				if (element.isRecord()) {
					processElement(element.<IN>asRecord());
				}
				else if (element.isWatermark()) {
					processWatermark(element.asWatermark());
				}
				else if (element.isLatencyMarker()) {
					processLatencyMarker(element.asLatencyMarker());
				}
				else {
					throw new IllegalStateException("Unknown record type " + element.getClass() +
						" encountered while opening the operator.");
				}
			}
			recoveredStreamElements = null;
		}
	}


	@Override
	public void snapshotState(StateSnapshotContext context) throws Exception {
		super.snapshotState(context);

		//先清除状态
		ListState<StreamElement> partitionableState =
			getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
		partitionableState.clear();

		//将所有未完成处理请求对应的消息加入状态中
		Collection<StreamElementQueueEntry<?>> values = queue.values();
		try {
			for (StreamElementQueueEntry<?> value : values) {
				partitionableState.add(value.getStreamElement());
			}

			// add the pending stream element queue entry if the stream element queue is currently full
			if (pendingStreamElementQueueEntry != null) {
				partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
			}
		} catch (Exception e) {
			partitionableState.clear();
			throw new Exception("Could not add stream element queue entries to operator state " +
				"backend of operator " + getOperatorName() + '.', e);
		}
	}

}

小结

在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。

参考

-EOF-