在流计算中,时间属性承担了一个极其重要的作用,所有基于时间的操作,例如窗口操作,都需要正确获取时间信息。我们曾经在Flink 源码阅读笔记(12)- 时间、定时器和窗口这篇文章中分析过 Flink 内部时间属性、水位线等机制的具体实现。在这篇文章中,我们将介绍在 SQL 和 Table API 中时间属性相关的一些细节。
时间属性概览
在 Flink SQL 中,表可以提供逻辑上的时间属性用于获取时间信息,时间属性可以是处理时间也可以是事件时间。在声明一张表的时候,时间属性可以在表的 schema 中定义。有些特定的操作,如窗口关联和窗口聚合操作必须基于时间属性字段,因而时间属性可以被看作一种特殊的字段类型;但是时间属性可以当作常规的时间戳字段来使用,一旦需要在计算中使用到时间属性,就需要“物化”(materialized)时间属性,时间属性字段就会被转换成一个常规的时间戳类型。被物化后的时间属性不再与 Flink 的时间系统和水位线相关联,因而也就不可以再应用在基于时间的操作中。
在 Flink SQL 的类型系统中,时间属性和常规的时间戳类型共用同样的逻辑类型 TimestampType,但是通过 TimestampKind 进行区分:
| 1
2
3
4
5
 | public enum TimestampKind {
	REGULAR, //常规的时间戳类型
	ROWTIME, //事件时间
	PROCTIME //处理时间
}
 | 
 
由于 Flink SQL 使用 Calcite 完成查询计划的优化,Flink 的所有逻辑类型在 Calcite 中都有对应的 RelDataType,并且为时间属性单独创建了一种新的 RelDataType,即 TimeIndicatorRelDataType:
| 1
2
3
4
5
6
7
8
9
 | class TimeIndicatorRelDataType(
    typeSystem: RelDataTypeSystem,
    originalType: BasicSqlType,
    val isEventTime: Boolean) //通过 isEventTime 区分是事件时间还是处理时间
  extends BasicSqlType(
    typeSystem,
    originalType.getSqlTypeName,
    originalType.getPrecision) {
}
 | 
 
定义表的时间属性
有两种方式来定义一张表的时间属性,一种方式是在将 DataStream 转换成 Table 的过程中,另一种方式是直接在 TableSource 的具体实现中定义。
DataStream 转换为 Table
在将 DataStream 转换成一个 Table 的过程中,可以用特殊的表达式来声明时间属性对应的列:
| 1
2
3
4
5
 | //处理时间
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
//事件时间
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
 | 
 
其中 'field.proctime 或这 'field.rowtime 即声明时间属性,表达式会被转换为 PROCTIME 和 ROWTIME 这两个内置函数的调用 UnresolvedCallExpression:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 |   // Time definition
  /**
    * Declares a field as the rowtime attribute for indicating, accessing, and working in
    * Flink's event time.
    */
  def rowtime: Expression = unresolvedCall(ROWTIME, expr)
  /**
    * Declares a field as the proctime attribute for indicating, accessing, and working in
    * Flink's processing time.
    */
  def proctime: Expression = unresolvedCall(PROCTIME, expr)
 | 
 
非 Scala 环境下则通过 ExpressionParser 完成表达式的解析。
使用 TableSource
如果要在 TableSource 中定义时间属性,则需要 TableSource 实现 DefinedProctimeAttribute 或者 DefinedRowAttribute 接口,并且引用的时间属性必须出现在 TableSchema 中,类型为 timestamp 类型。如果要同时使用处理时间和事件时间,对应的 TableSource 需要同时实现这两个接口:
| 1
2
3
4
5
6
7
8
 | public interface DefinedProctimeAttribute {
	@Nullable
	String getProctimeAttribute();
}
public interface DefinedRowtimeAttributes {
	List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
 | 
 
其中 RowtimeAttributeDescriptor 是对事件时间的描述,包括如何提取事件时间,以及 watermark 的生成策略等:
| 1
2
3
4
5
 | public final class RowtimeAttributeDescriptor {
	private final String attributeName; //时间属性名称
	private final TimestampExtractor timestampExtractor; //如何提取事件时间
	private final WatermarkStrategy watermarkStrategy; //如何生成 watermark
}
 | 
 
尽管返回值是 List<RowtimeAttributeDescriptor>,但目前 Flink SQL 只支持单个事件时间属性。
SQL 引擎中时间属性的转换
更新 TableSchema
在将 DataStream 转换成一个 Table 的过程中,首先需要生成表结构。Table 的底层对应的是一个 QueryOperation,在这里就是 ScalaDataStreamQueryOperation (或者 JavaDataStreamQueryOperation,对应 Java API)。QueryOperation 提供了 TableSchema 和字段映射关系:
| 1
2
3
4
5
 | public class ScalaDataStreamQueryOperation<E> implements QueryOperation {
	private final DataStream<E> dataStream;
	private final int[] fieldIndices; //字段索引映射关系
	private final TableSchema tableSchema; //表结构
}
 | 
 
获得 TableSchema 的逻辑主要被封装在 FieldInfoUtils.getFieldsInfo 方法中,主要是通过解析 Expression 获得表结构中每一列对应的字段在 DataStream 中元素的索引,并得到对应字段的类型:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
 | /**
 * Utility methods for extracting names and indices of fields from different {@link TypeInformation}s.
 */
public class FieldInfoUtils {
	private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<FieldInfo> {
		@Override
		public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
			return createFieldInfo(unresolvedReference, null);
		}
		@Override
		public FieldInfo visit(UnresolvedCallExpression unresolvedCall) {
			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
				return visitAlias(unresolvedCall);
			} else if (isRowTimeExpression(unresolvedCall)) {
				return createRowtimeFieldInfo(unresolvedCall, null);
			} else if (isProcTimeExpression(unresolvedCall)) {
				return createProctimeFieldInfo(unresolvedCall, null);
			}
			return defaultMethod(unresolvedCall);
		}
	}
	private static boolean isRowTimeExpression(Expression origExpr) {
		return origExpr instanceof UnresolvedCallExpression &&
			((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.ROWTIME;
	}
	private static boolean isProcTimeExpression(Expression origExpr) {
		return origExpr instanceof UnresolvedCallExpression &&
			((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.PROCTIME;
	}
	private static FieldInfo createTimeAttributeField(
			UnresolvedReferenceExpression reference,
			TimestampKind kind, //这里的Kind是TimestampKind.PROCTIME或TimestampKind.ROWTIME
			@Nullable String alias) {
		final int idx;
		//对于时间属性,没有对应的索引,用特殊的标识
		if (kind == TimestampKind.PROCTIME) {
			idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
		} else {
			idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
		}
		String originalName = reference.getName();
		return new FieldInfo(
			alias != null ? alias : originalName,
			idx,
			createTimeIndicatorType(kind));
	}
}
 | 
 
由于时间属性对应的表达式是内置函数的调用,因而可以通过判断对应的函数定义识别出来。
时间属性并不对应 DataStream 中元素真实的字段,因此会用特殊的标识来作为索引:
| 1
2
3
4
 | public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo<Timestamp> {
	public static final int ROWTIME_STREAM_MARKER = -1;
	public static final int PROCTIME_STREAM_MARKER = -2;
}
 | 
 
如果通过 TableSource 注册一张表,首先会通过 TableSourceValidation.validateTableSource() 验证表结构、时间属性等信息,然后会被封装为 ConnectorCatalogTable,在这里会完成 TableSchema 的更新:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 | public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
	public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) {
		//更新 TableSchema
		final TableSchema tableSchema = calculateSourceSchema(source, isBatch);
		return new ConnectorCatalogTable<>(source, null, tableSchema, isBatch);
	}
	private static <T1> TableSchema calculateSourceSchema(TableSource<T1> source, boolean isBatch) {
		TableSchema tableSchema = source.getTableSchema();
		if (isBatch) {
			return tableSchema;
		}
		DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
		String[] fieldNames = tableSchema.getFieldNames();
		//检查是否实现了 DefinedRowtimeAttributes 接口
		if (source instanceof DefinedRowtimeAttributes) {
			updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types);
		}
		//检查是否实现了 DefinedProctimeAttribute 接口
		if (source instanceof DefinedProctimeAttribute) {
			updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types);
		}
		return TableSchema.builder().fields(fieldNames, types).build();
	}
	private static void updateRowtimeIndicators(
			DefinedRowtimeAttributes source,
			String[] fieldNames,
			DataType[] types) {
		List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors()
			.stream()
			.map(RowtimeAttributeDescriptor::getAttributeName)
			.collect(Collectors.toList());
		for (int i = 0; i < fieldNames.length; i++) {
			if (rowtimeAttributes.contains(fieldNames[i])) {
				// bridged to timestamp for compatible flink-planner
				types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
						.bridgedTo(java.sql.Timestamp.class);
			}
		}
	}
}
 | 
 
经过更新后的 TableSchema,时间属性列的 LogicalType 就是用特殊 TimestampKind 表征的 TimestampType。
转换到 Calcite
在 DatabaseCalciteSchema 中, Flink SQL 中注册的表被转换成 Calcite 中使用的表:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 | class DatabaseCalciteSchema extends FlinkSchema {
	@Override
	public Table getTable(String tableName) {
		ObjectPath tablePath = new ObjectPath(databaseName, tableName);
		try {
			if (!catalog.tableExists(tablePath)) {
				return null;
			}
			CatalogBaseTable table = catalog.getTable(tablePath);
			//将 Flink Catalog 中注册的表转换为 Calcite 中的 Table
			if (table instanceof QueryOperationCatalogView) {
				QueryOperationCatalogView view = (QueryOperationCatalogView) table;
				QueryOperation operation = view.getQueryOperation();
				if (operation instanceof DataStreamQueryOperation) {
					List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName);
					((DataStreamQueryOperation) operation).setQualifiedName(qualifiedName);
				} else if (operation instanceof RichTableSourceQueryOperation) {
					List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName);
					((RichTableSourceQueryOperation) operation).setQualifiedName(qualifiedName);
				}
				return QueryOperationCatalogViewTable.createCalciteTable(view);
			} else if (table instanceof ConnectorCatalogTable) {
				return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, tablePath);
			} else if (table instanceof CatalogTable) {
				return convertCatalogTable(tablePath, (CatalogTable) table);
			} else {
				throw new TableException("Unsupported table type: " + table);
			}
		} catch (TableNotExistException | CatalogException e) {
			// TableNotExistException should never happen, because we are checking it exists
			// via catalog.tableExists
			throw new TableException(format(
					"A failure occurred when accessing table. Table path [%s, %s, %s]",
					catalogName,
					databaseName,
					tableName), e);
		}
	}
}
 | 
 
在 Flink SQL 中定义的表结构 TableSchema 也会经过 FlinkTypeFactory 转换,LogicalType也会被转换成 Calcite 内部使用的 RelDataType:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 | /**
  * Flink specific type factory that represents the interface between Flink's [[LogicalType]]
  * and Calcite's [[RelDataType]].
  */
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
  def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
    ......
    val relType = t.getTypeRoot match {
      case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
        val timestampType = t.asInstanceOf[TimestampType]
        timestampType.getKind match {
          //时间戳类型识别出常规的时间戳和时间属性
          case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
          case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
          case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
        }
      case _ =>
        seenTypes.get(t) match {
          case Some(retType: RelDataType) => retType
          case None =>
            val refType = newRelDataType()
            seenTypes.put(t, refType)
            refType
        }
    }
    .....
  }
}
 | 
 
经过这一步转换,时间属性被转换成 TimeIndicatorRelDataType 类型。
查询计划转换
在 Calcite 优化查询计划是,会识别特殊语句中的时间属性,并转换成对应的 RelNode。例如,在 FlinkLogicalJoin 中,如果关联条件中包含了时间窗口,就会被转换为 StreamExecWindowJoin;而不包含时间窗口的 FlinkLogicalJoin 则会被转换为 StreamExecJoin。其区别就在于对时间属性的识别:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 | /**
  * Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition
  * to [[StreamExecWindowJoin]].
  */
class StreamExecWindowJoinRule
  extends ConverterRule(
    classOf[FlinkLogicalJoin],
    FlinkConventions.LOGICAL,
    FlinkConventions.STREAM_PHYSICAL,
    "StreamExecWindowJoinRule") {
  override def matches(call: RelOptRuleCall): Boolean = {
    ......
    val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join)
    //提取时间窗口的边界
    val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
      join.getCondition,
      join.getLeft.getRowType.getFieldCount,
      joinRowType,
      join.getCluster.getRexBuilder,
      tableConfig)
    if (windowBounds.isDefined) {
      //如果识别到时间窗口,该规则匹配
      if (windowBounds.get.isEventTime) {
        true
      } else {
        // Check that no event-time attributes are in the input because the processing time window
        // join does not correctly hold back watermarks.
        // We rely on projection pushdown to remove unused attributes before the join.
        !joinRowType.getFieldList.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
      }
    } else {
      // the given join does not have valid window bounds. We cannot translate it.
      false
    }
  }
}
 | 
 
窗口聚合也是同理,只是规则匹配中是对 HOP, TUMBLE 等窗口函数的识别。
物化时间属性
时间属性是一个逻辑上的列,因为它并不真实对应底层 DataStream 元素中具体的字段。因此,如果在计算操作中要用到时间属性列的值,就需要“物化”(materialized)时间属性。这一部分的逻辑主要是在 RelTimeIndicatorConverter 中。
对于 PROCTIME 时间属性,就是将其转换为对内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE 的调用;而对于 ROWTIME 时间属性,则是对其进行一次强制的类型转换,转换为常规的时间戳。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 | /**
  * Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]].
  */
class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {
  def materialize(expr: RexNode): RexNode = {
    if (isTimeIndicatorType(expr.getType)) {
      if (isRowtimeIndicatorType(expr.getType)) {
        // cast rowtime indicator to regular timestamp
        rexBuilder.makeAbstractCast(timestamp(expr.getType.isNullable), expr)
      } else {
        // generate proctime access
        rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
      }
    } else {
      expr
    }
  }
}
 | 
 
生成物理执行计划
SQL 的查询计划最终需要被转换为 Flink 的算子,即生成物理执行计划。对于一个查询计划来说,首先需要进行转换的就是 Scan 操作,在里将底层的 POJO、Tuple 等对象映射为 Table 中使用的 Row。在这里就需要考虑到时间属性的映射。
首先,我们来看下在 StreamExecDataStreamScan 中是如何完成字段映射的转换的:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 | class StreamExecDataStreamScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    table: RelOptTable,
    outputRowType: RelDataType)
  extends TableScan(cluster, traitSet, table) {
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[BaseRow] = {
    val config = planner.getTableConfig
    val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
    val transform = inputDataStream.getTransformation
    val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder)
    // when there is row time extraction expression, we need internal conversion
    // when the physical type of the input date stream is not BaseRow, we need internal conversion.
    if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) {
      // extract time if the index is -1 or -2.
      val (extractElement, resetElement) =
        if (ScanUtil.hasTimeAttributeField(dataStreamTable.fieldIndexes)) {
          (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
        } else {
          ("", "")
        }
      val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
        classOf[AbstractProcessStreamOperator[BaseRow]])
      //生成字段映射的代码
      ScanUtil.convertToInternalRow(
        ctx,
        transform,
        dataStreamTable.fieldIndexes, //字段索引
        dataStreamTable.dataType,
        getRowType,
        getTable.getQualifiedName,
        config,
        rowtimeExpr, //RowTime 表达式
        beforeConvert = extractElement,
        afterConvert = resetElement)
    } else {
      transform.asInstanceOf[Transformation[BaseRow]]
    }
  }
  //获取 RowTime 时间属性的表达式
  private def getRowtimeExpression(relBuilder: FlinkRelBuilder): Option[RexNode] = {
    val fieldIdxs = dataStreamTable.fieldIndexes
    if (!fieldIdxs.contains(ROWTIME_STREAM_MARKER)) {
      None
    } else { 
      //根据字段映射中的特殊marker来查找 rowtime 字段
      val rowtimeField = dataStreamTable.fieldNames(fieldIdxs.indexOf(ROWTIME_STREAM_MARKER))
      // get expression to extract timestamp
      fromDataTypeToLogicalType(dataStreamTable.dataType) match {
        //如果这个datastream本身已经完成了 rowtime 时间属性的提取
        case dataType: RowType
          if dataType.getFieldNames.contains(rowtimeField) &&
              TypeCheckUtils.isRowTime(dataType.getTypeAt(dataType.getFieldIndex(rowtimeField))) =>
          // if rowtimeField already existed in the data stream, use the default rowtime
          None
        case _ =>
          // 用内置函数 StreamRecordTimestampSqlFunction 来提取 rowtime 时间属性
          // extract timestamp from StreamRecord
          Some(
            relBuilder.cast(
              relBuilder.call(new StreamRecordTimestampSqlFunction),
              relBuilder.getTypeFactory.createFieldTypeFromLogicalType(
                new TimestampType(true, TimestampKind.ROWTIME, 3)).getSqlTypeName))
      }
    }
  }
}
 | 
 
在进行字段映射的时候,一个关键的处理就是时间属性要怎么映射。ROWTIME 时间属性被转换为 StreamRecordTimestampSqlFunction 的调用。在前面物化时间属性的阶段,我们已经看到,PROCTIME 时间属性已经被转换为内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE 的调用,而对于 ROWTIME 时间属性,只是进行了一次强制的类型转换。这主要是因为,PROCTIME 在流处理中,在不同的算子中,每一次调用都应该获取当前的系统时间;而对于 ROWTIME 而言,它的取值是固定的,因此只需要在最开始完成一次转换即可。RPOCTIME 和 ROWTIME 的取值最终都被转换成对内置函数的调用。
这个函数调用的代码生成就比较简单了,RPOCTIME 转换成 context.timerService().currentProcessingTime(),ROWTIME 转换成 context.timestamp(),具体的代码生成可以参考 GenerateUtils。
由于 PROCTIME 并不需要在 Scan 节点进行物化,因此在这里直接用 null 值替代,在后续需要的时候重新进行计算。
而对于从 TableSource 注册而来的表,由于它不像 DataStream 那样,在定义 ROWTIME 时间属性之前已经完成了 timestamp 和 watermark 的指定。因此在转换 TableSource 的过程中,如果定义了 ROWTIME 时间属性,除了需要提取 ROWTIME 时间属性的值以外,还需要指定 watermark。
RowtimeAttributeDescriptor 是对 ROWTIME 时间属性的描述,包括如何提取事件时间的 TimestampExtractor:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 | /**
 * The {@link FieldComputer} interface returns an expression to compute the field of the table
 * schema of a {@link TableSource} from one or more fields of the {@link TableSource}'s return type.
 *
 * @param <T> The result type of the provided expression.
 */
public interface FieldComputer<T> {
	String[] getArgumentFields();
	TypeInformation<T> getReturnType();
	void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes);
	/**
	 * Returns the {@link Expression} that computes the value of the field.
	 *
	 * @param fieldAccesses Field access expressions for the argument fields.
	 * @return The expression to extract the timestamp from the {@link TableSource} return type.
	 */
	Expression getExpression(ResolvedFieldReference[] fieldAccesses);
}
//Provides an expression to extract the timestamp for a rowtime attribute.
public abstract class TimestampExtractor implements FieldComputer<Long>, Serializable, Descriptor {
	@Override
	public TypeInformation<Long> getReturnType() {
		return Types.LONG; //返回类型是 Long
	}
}
 | 
 
通常提取 ROWTIME 的方式是根据已有的字段来生成,即:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 | public final class ExistingField extends TimestampExtractor {
	private String field;
	
	/**
	 * Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or
	 * timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000")
	 * into a rowtime attribute.
	 */
	@Override
	public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
		ResolvedFieldReference fieldAccess = fieldAccesses[0];
		DataType type = fromLegacyInfoToDataType(fieldAccess.resultType());
		//字段引用的表达式
		FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression(
				fieldAccess.name(),
				type,
				0,
				fieldAccess.fieldIndex());
		//支持的输入字段类型,包括 BIGINT、TIMESTAMP_WITHOUT_TIME_ZONE 和 VARCHAR
		switch (type.getLogicalType().getTypeRoot()) {
			case BIGINT:
			case TIMESTAMP_WITHOUT_TIME_ZONE:
				//直接引用相应的字段即可
				return fieldReferenceExpr;
			case VARCHAR:
				//进行一次类型转换
				DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
				return new CallExpression(
						CAST,
						Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
						outputType);
			default:
				throw new RuntimeException("Unsupport type: " + type);
		}
	}
}
 | 
 
TimestampExtractor 提供的提取时间属性的 Expression, 要先被转换为 RexNode 之后才可以被用在代码生成中,通过 TableSourceUtil.getRowtimeExtractionExpression 完成。如同前面提到对 StreamRecordTimestampSqlFunction 调用,这两种获得时间属性的的方式本质上是一样的,只是调用的函数不一致。
在完成了字段映射和时间属性的提取后,还需要指定 watermark,这正是 RowtimeAttributeDescriptor 中 WatermarkStrategy 的用处:
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 | class StreamExecTableSourceScan(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    relOptTable: FlinkRelOptTable)
  extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) {
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[BaseRow] = {
    val config = planner.getTableConfig
    val inputTransform = getSourceTransformation(planner.getExecEnv)
    //1. 字段映射和时间提取
    ......
    //2. 指定watermark
    val withWatermarks = if (rowtimeDesc.isDefined) {
      val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
      val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
      watermarkStrategy match {
        case p: PeriodicWatermarkAssigner =>
          val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
        case p: PunctuatedWatermarkAssigner =>
          val watermarkGenerator =
            new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType)
          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
        case _: PreserveWatermarks =>
          // The watermarks have already been provided by the underlying DataStream.
          ingestedTable
      }
    } else {
      // No need to generate watermarks if no rowtime attribute is specified.
      ingestedTable
    }
    withWatermarks.getTransformation
  }
}
 | 
 
到这里,就完成了 ROWTIME 时间属性的提取的 watermark 的生成。 PROCTIME 在使用时按需要进行物化。
小结
时间属性是广泛应用在窗口操作中,是流式 SQL 处理中非常重要的概念。本文对 Flink SQL 中时间属性的使用方法和具体实现进行了介绍。
参考
-EOF-