在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL 简化了使用的门槛,但是如何将 SQL 等价转换到现有的数据处理引擎中却并非易事,尤其是在流式数据处理框架中。

最近,Flink 发布了备受瞩目的 1.9 版本,由于阿里开源了其内部的 Blink 分支,Flink SQL 的功能得到了进一步的改进和增强。尽管在这个版本中,Blink 仍然只是作为一个预览的版本发布,但是 Blink 后续将会成为 Flink 社区的主要开发方向。接下来,我们将主要基于 Blink 来介绍 Flink SQL 的整体执行流程。

背景知识

SQL 的执行流程一般分为四个主要的阶段,Flink 主要依赖于 Calicte 来完成这一流程:

sql-process

Parse:语法解析,把 SQL 语句转换成为一个抽象语法树(AST),在 Calcite 中用 SqlNode 来表示;

Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是 SqlNode 构成的语法树;

Optimize:查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;

Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。

Flink SQL 的处理也大体遵循上述处理流程。Calcite 自身的概念较为庞杂,尤其是其内部使用的 HepPlanner 和 VolcanoPlanner 优化器更是非常复杂,但好在 Calcite 的可扩展性很强,优化器的优化规则也可以很容易地进行扩展,因此如果只是了解 Flink SQL 的基本原理和扩展方法,也无需对 Calcite 的代码了解的非常透彻。关于 Calcite 的基本介绍可以参考这个Slide

可插拔的 SQL Runner

通过对 table 模块进行拆分和重构,Flink SQL 抽象出了 Planner 接口和 Executor 接口,可以支持多个不同的 Runner,用户可以自行选择希望使用的 Runner。不同的 Runner 只需要正确地实现这两个接口即可。在 1.9 版本中,Flink 提供了两套 SQL Runner,分别对应之前社区已有的 Flink SQL Runner 和新的 Blink Runner,Blink Runner 目前只是一个预览版本,在后续的迭代中会取代现有的 Flink Runner。

Planner 接口

1
2
3
4
public interface Planner {
	List<Operation> parse(String statement);
	List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);
}

Planner 接口完成 SQL 的解析和逻辑计划的转换,最终得到构建 Flink 计算逻辑的转换算子。

Executor 接口

1
2
3
4
public interface Executor {
	void apply(List<Transformation<?>> transformations);
	JobExecutionResult execute(String jobName) throws Exception;
}

Executor 接口将 Planner 接口翻译出的算子应用到底层运行时环境中,并提供启动程序的入口。

SQL 解析

SQL 的解析在 PlannerBase.parse() 中实现:1)首先使用 Calcite 的解析出抽象语法树 SqlNode,2)然后结合元数据对 SQL 语句进行验证,3)将 SqlNode 转换为 RelNode,4)并最终封装为 Flink 内部对查询操作的抽象 QueryOperation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
abstract class PlannerBase(
    executor: Executor,
    config: TableConfig,
    val functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager,
    isStreamingMode: Boolean)
  extends Planner {
  override def parse(stmt: String): util.List[Operation] = {
    val planner = createFlinkPlanner
    // 1)这一步解析得到 SqlNode
    val parsed = planner.parse(stmt)
    // 使用 SqlToOperationConverter 将 SqlNode 转化为 Operation
    parsed match {
      case insert: RichSqlInsert =>
        List(SqlToOperationConverter.convert(planner, insert))
      case query if query.getKind.belongsTo(SqlKind.QUERY) =>
        //查询语句
        List(SqlToOperationConverter.convert(planner, query))
      case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
        List(SqlToOperationConverter.convert(planner, ddl))
      case _ =>
        throw new TableException(s"Unsupported query: $stmt")
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SqlToOperationConverter {
	public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
		// 2) 结合元数据验证 Sql 的合法性
		final SqlNode validated = flinkPlanner.validate(sqlNode);
		// 将 SqlNode 转化为 Operation
		SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
		if (validated instanceof SqlCreateTable) {
			return converter.convertCreateTable((SqlCreateTable) validated);
		} if (validated instanceof SqlDropTable) {
			return converter.convertDropTable((SqlDropTable) validated);
		} else if (validated instanceof RichSqlInsert) {
			return converter.convertSqlInsert((RichSqlInsert) validated);
		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
			return converter.convertSqlQuery(validated);
		} else {
			throw new TableException("Unsupported node type "
				+ validated.getClass().getSimpleName());
		}
	}

	/** Fallback method for sql query. */
	private Operation convertSqlQuery(SqlNode node) {
		return toQueryOperation(flinkPlanner, node);
	}

  private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
		// 3) 使用 SqlToRelConverter 将 SqlNode 转换成 RelNode
		RelRoot relational = planner.rel(validated);
		// 4) 将 RelNode 封装成 PlannerQueryOperation
		return new PlannerQueryOperation(relational.project());
	}
}

我们知道,Flink 借助于 Calcite 完成 SQl 的解析和优化,而后续的优化部分其实都是直接基于 RelNode 来完成的,那么这里为什么又多出了一个 QueryOperation 的概念呢?这主要是因为,Flink SQL 是支持 SQL 语句和 Table Api 接口混合使用的,在 Table Api 接口中,主要的操作都是基于 Operation 接口来完成的。

SQL 转换及优化

在将 SQL 语句解析成 Operation 后,为了得到 Flink 运行时的具体操作算子,需要进一步将 ModifyOperation 转换为 Transformation。在 Blink 之前的 SQL Planner 中,都是基于 DataStreamDataSet API 完成运行时逻辑的构建;而 Blink 则使用更底层的 Transformation 算子。

注意,Planner.translate(List<ModifyOperation> modifyOperations) 方法接收的参数是 ModifyOperationModifyOperation 对应的是一个 DML 的操作,在将查询结果插入到一张结果表或者转换为 DataStream 时,就会得到 ModifyOperation

转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNode,4)转换为底层的 Transformation 算子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
abstract class PlannerBase(
    executor: Executor,
    config: TableConfig,
    val functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager,
    isStreamingMode: Boolean)
  extends Planner {
  override def translate(
      modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
    if (modifyOperations.isEmpty) {
      return List.empty[Transformation[_]]
    }
    mergeParameters()
    // 1)将 Operation 转换为 RelNode
    val relNodes = modifyOperations.map(translateToRel)
    // 2)优化 RelNode
    val optimizedRelNodes = optimize(relNodes)
    // 3)转换成 ExecNode
    val execNodes = translateToExecNodePlan(optimizedRelNodes)
    // 4)转换为底层的 Transformation 算子
    translateToPlan(execNodes)
  }
}

首先需要进行的操作是将 Operation 转换为 RelNode,这个转换操作借助 QueryOperationConverter 完成。Operation 其实类似于 SQL 语法树,也构成一个树形结构,并实现了访问者模式,支持使用 QueryOperationVisitor 遍历整棵树,QueryOperationConverter 实现了 QueryOperationVisitor 接口。对于 PlannerQueryOperation,其内部封装的就是已经构建好的 RelNode,直接取出即可;对于其它类型的 Operation,则按需转换为对应的 RelNode

在得到 RelNode 后,就进入 Calcite 对 RelNode 的优化流程。在 Blink 中有一点特殊的地方在于,由于多个 RelNode 构成的树可能存在共同的“子树”(例如将相同的查询结果输出到不同的结果表中,那么两个 LogicalSink 的子树就可能是共用的),Blink 使用了一种 CommonSubGraphBasedOptimizer 优化器,将拥有共同子树的 RelNode 看作一个 DAG 结构,并将 DAG 划分成 RelNodeBlock,然后在RelNodeBlock 的基础上进行优化工作。每一个 RelNodeBlock 可以看作一个 RelNode 树进行优化,这和正常的 Calcite 处理流程还是保持一致的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
abstract class CommonSubGraphBasedOptimizer extends Optimizer {
  override def optimize(roots: Seq[RelNode]): Seq[RelNode] = {
    //以RelNodeBlock为单位进行优化,在子类中实现,StreamCommonSubGraphBasedOptimizer,BatchCommonSubGraphBasedOptimizer
    val sinkBlocks = doOptimize(roots)
    //获得优化后的逻辑计划
    val optimizedPlan = sinkBlocks.map { block =>
      val plan = block.getOptimizedPlan
      require(plan != null)
      plan
    }
    //将 RelNodeBlock 使用的中间表展开
    expandIntermediateTableScan(optimizedPlan)
  }
}

Caclite 对逻辑计划的优化是一套基于规则的框架,用户可以通过添加规则进行扩展,Flink 就是基于自定义规则来实现整个的优化过程。Flink 构造了一个链式的优化程序,可以按顺序使用多套规则集合完成 RelNode 的优化过程。在 FlinkStreamProgramFlinkBatchProgram 中定义了一系列扩展规则,用于构造逻辑计划的优化器。与此同时,Flink 扩展了 RelNode,增加了 FlinkLogicRelFlinkPhysicRel 这两类 RelNode,对应的 Convention 分别为 FlinkConventions.LOGICALFlinkConventions.STREAM_PHYSICAL (或FlinkConventions.BATCH_PHYSICAL)。在优化器的处理过程中,RelNode 会从 Calcite 内部定义的节点转换为 FlinkLogicRel 节点(FlinkConventions.LOGICAL),并最终被转换为 FlinkPhysicRel 节点(FlinkConventions.STREAM_PHYSICAL)。这两类转换规则分别对应 FlinkStreamRuleSets.LOGICAL_OPT_RULESFlinkStreamRuleSets.PHYSICAL_OPT_RULES。在不考虑其它更复杂的性能优化的情况下,如果要扩展 Flink SQL 的语法规则,可以参考这两类规则来增加节点和转换规则。

Join 操作为例,首先经过 Calcite 解析后得到的节点为 LogicalJoinLogicalJoin 会匹配转换规则中的 FlinkLogicalJoinConverter 规则,在该规则中被转换为 FlinkLogicalJoin

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private class FlinkLogicalJoinConverter
  extends ConverterRule(
    classOf[LogicalJoin],
    Convention.NONE,
    FlinkConventions.LOGICAL,
    "FlinkLogicalJoinConverter") {

  override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    //转换左子树
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    //转换右子树
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
    //创建 FlinkLogicalJoin 节点,Convention 被替换为 FlinkConventions.LOGICAL
    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getJoinType)
  }
}

object FlinkLogicalJoin {
  val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter

  def create(
      left: RelNode,
      right: RelNode,
      conditionExpr: RexNode,
      joinType: JoinRelType): FlinkLogicalJoin = {
    val cluster = left.getCluster
    val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
    new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType)
  }
}

FlinkLogicalJoin 会匹配 StreamExecJoinRule 规则,经过这一步转换会得到 StreamExecJoin 节点:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class StreamExecJoinRule
  extends RelOptRule(
    operand(classOf[FlinkLogicalJoin],
      operand(classOf[FlinkLogicalRel], any()),
      operand(classOf[FlinkLogicalRel], any())),
    "StreamExecJoinRule") {

  override def matches(call: RelOptRuleCall): Boolean = {
    val join: FlinkLogicalJoin = call.rel(0)
    ......
    // 判断节点是否匹配规则
  }

  override def onMatch(call: RelOptRuleCall): Unit = {
    val join: FlinkLogicalJoin = call.rel(0)
    val left = join.getLeft
    val right = join.getRight

    .......

    val providedTraitSet = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)

    val newLeft: RelNode = RelOptRule.convert(left, leftRequiredTrait)
    val newRight: RelNode = RelOptRule.convert(right, rightRequiredTrait)

    //生成 StreamExecJoin 节点
    val newJoin = new StreamExecJoin(
      join.getCluster,
      providedTraitSet,
      newLeft,
      newRight,
      join.getCondition,
      join.getJoinType)
    call.transformTo(newJoin)
  }
}

flink-sql-relnode-convert

经过优化器处理后,得到的逻辑树中的所有节点都应该是 FlinkPhysicRel,这之后就可以用于生成物理执行计划了。首先要将 FlinkPhysicalRel 构成的 DAG 转换成 ExecNode 构成的 DAG,因为可能存在共用子树的情况,这里还会尝试共用相同的子逻辑计划。由于通常 FlinkPhysicalRel 的具体实现类通常也实现了 ExecNode 接口,所以这一步转换较为简单。

在得到由 ExecNode 构成的 DAG 后,就可以尝试生成物理执行计划了,也就是将 ExecNode 节点转换为 Flink 内部的 Transformation 算子。不同的 ExecNode 按照各自的需求生成不同的 Transformation,基于这些 Transformation 构建 Flink 的 DAG。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
trait ExecNode[E <: Planner, T] {
  /**
    * Translates this node into a Flink operator.
    */
  def translateToPlan(planner: E): Transformation[T] = {
    if (transformation == null) {
      transformation = translateToPlanInternal(planner)
    }
    transformation
  }
}

SQL 执行

在得到 Transformation 后,利用 Transformation 生成 StreamGraph 后就可以提交 Flink 任务了。根据 Transformation 列表生成 StreamGraph 比较简单,依次将算子添加到 StreamExecutionEnvironment 即可。这里值得一提的一点在于,在 Blink Runner 中,Stream SQL 和 Batch SQL 都基于同样的 Streaming Runtime 架构,统一使用 StreamExecutionEnvironmentTransformation DAG。后续社区会统一 Stream 和 Batch 这两种模式的运行时环境、算子和 API 接口,Blink SQL Runner 算是率先的一个尝试。当然,在 Stream 和 Batch 模式下,算子内部的处理逻辑肯定是要单独进行优化的。Batch 任务运行在 StreamExecutionEnvironment 中需要进行一些特殊的设置,如调度模式,Shuffle 模式等等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class BatchExecutor extends ExecutorBase {
	/**
	 * Sets batch configs.
	 */
	private void setBatchProperties(StreamExecutionEnvironment execEnv) {
		ExecutionConfig executionConfig = execEnv.getConfig();
		executionConfig.enableObjectReuse();
		executionConfig.setLatencyTrackingInterval(-1);
		execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
		execEnv.setBufferTimeout(-1);
		if (isShuffleModeAllBatch()) {
			executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
		}
	}

	@Override
	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
		setBatchProperties(execEnv);
		transformations.forEach(execEnv::addOperator);
		StreamGraph streamGraph;
		streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
		// All transformations should set managed memory size.
		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
		streamGraph.getStreamNodes().forEach(sn -> {
			if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
				sn.setResources(managedResourceSpec, managedResourceSpec);
			}
		});
		streamGraph.setChaining(true);
		//设置调度模式为 LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
		streamGraph.setStateBackend(null);
		if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
			throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
		}
		if (isShuffleModeAllBatch()) {
			streamGraph.setBlockingConnectionsBetweenChains(true);
		}
		return streamGraph;
	}

	private boolean isShuffleModeAllBatch() {
		//不允许 ShuffleMode 模式为 PIPELINED
		String value = tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
		if (value.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
			return true;
		} else if (!value.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
			throw new IllegalArgumentException(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.key() +
					" can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
		}
		return false;
	}
}

Table API

除了使用纯 SQL 语句的方式外, Flink 还支持 Table API 编程,对 Table API 的支持主要借助 Table, OperationExpression 等接口。

OperationExpression 是对操作和表达式的抽象,OperationExpression 都有一套各自的类继承层次,可以等同于 Calcite 中的 RelNodeRexNode。通过 Table API 接口,可以构建出语法树,这颗 Operation 树最终被转换为 RelNode 树,之后就是进入前面提到的转换和优化逻辑了。

Table 是对于表的抽象,除了 schema 等信息外,其底层对应的是一个 QueryOperaion,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface Table {
	/**
	 * Returns the schema of this table.
	 */
	TableSchema getSchema();

	/**
	 * Returns underlying logical representation of this table.
	 */
	QueryOperation getQueryOperation();
}

通过 Table API 构建出由 OperationExpression 组成的查询树,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TableImpl {
  @Override
	public Table select(String fields) {
		//先解析获取 Expression
		return select(ExpressionParser.parseExpressionList(fields).toArray(new Expression[0]));
	}

	@Override
	public Table select(Expression... fields) {
		List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields)
			.map(f -> f.accept(lookupResolver))
			.collect(Collectors.toList());
		CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties(
			expressionsWithResolvedCalls
		);

		if (!extracted.getWindowProperties().isEmpty()) {
			throw new ValidationException("Window properties can only be used on windowed tables.");
		}
		//构建 QueryOperation 并以此为基础创建 Table
		if (!extracted.getAggregations().isEmpty()) {
			QueryOperation aggregate = operationTreeBuilder.aggregate(
				Collections.emptyList(),
				extracted.getAggregations(),
				operationTree
			);
			return createTable(operationTreeBuilder.project(extracted.getProjections(), aggregate, false));
		} else {
			return createTable(operationTreeBuilder.project(expressionsWithResolvedCalls, operationTree, false));
		}
	}
}

而我们在使用 Scala Api 时经常使用如下更方便的形式:

1
table.select('key, 'value.avg + " The average" as 'average)

这是由于在 flink-table-api-scala 中提供了很多方法和隐式转换,可以直接用于生成 Expression:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
trait ImplicitExpressionOperations {
  /**
    * Returns left plus right.
    */
  def + (other: Expression): Expression = unresolvedCall(PLUS, expr, other)

  /**
    * Specifies a name for an expression i.e. a field.
    *
    * @param name name for one field
    * @param extraNames additional names if the expression expands to multiple fields
    * @return field with an alias
    */
  def as(name: Symbol, extraNames: Symbol*): Expression =
    unresolvedCall(
      AS,
      expr +: valueLiteral(name.name) +: extraNames.map(name => valueLiteral(name.name)): _*)

  //Symbol 转换成 UnresolvedReferenceExpression
  implicit def symbol2FieldExpression(sym: Symbol): Expression =
    unresolvedRef(sym.name)
}

OperationExpression 实现了访问者模式,通过 QueryOperationConverterRexNodeConverter 转换为 RelNodeRexNode

flink-sql-operation-and-relnode

更正 :在最新的代码中,RexNodeConverter 已更名为 ExpressionConverter

小结

在这篇文章中,我们主要对 Flink 1.9 版本中 SQL 的整体执行框架进行了介绍。在这个版本中,Flink 提供了两套 SQL Runner,其中 Blink 将会在后续版本中取代已有的 SQL Runner 成为新的标准,因此我们主要是以 Blink Runner 为例介绍 SQL 的解析和优化流程。在后续的文章中,我们将更详细了解元数据管理、 SQL 算子的实现原理、代码生成等具体的实现细节。

参考

-EOF-