在流计算中,时间属性承担了一个极其重要的作用,所有基于时间的操作,例如窗口操作,都需要正确获取时间信息。我们曾经在Flink 源码阅读笔记(12)- 时间、定时器和窗口这篇文章中分析过 Flink 内部时间属性、水位线等机制的具体实现。在这篇文章中,我们将介绍在 SQL 和 Table API 中时间属性相关的一些细节。
时间属性概览
在 Flink SQL 中,表可以提供逻辑上的时间属性用于获取时间信息,时间属性可以是处理时间也可以是事件时间。在声明一张表的时候,时间属性可以在表的 schema 中定义。有些特定的操作,如窗口关联和窗口聚合操作必须基于时间属性字段,因而时间属性可以被看作一种特殊的字段类型;但是时间属性可以当作常规的时间戳字段来使用,一旦需要在计算中使用到时间属性,就需要“物化”(materialized)时间属性,时间属性字段就会被转换成一个常规的时间戳类型。被物化后的时间属性不再与 Flink 的时间系统和水位线相关联,因而也就不可以再应用在基于时间的操作中。
在 Flink SQL 的类型系统中,时间属性和常规的时间戳类型共用同样的逻辑类型 TimestampType
,但是通过 TimestampKind
进行区分:
1
2
3
4
5
|
public enum TimestampKind {
REGULAR, //常规的时间戳类型
ROWTIME, //事件时间
PROCTIME //处理时间
}
|
由于 Flink SQL 使用 Calcite 完成查询计划的优化,Flink 的所有逻辑类型在 Calcite 中都有对应的 RelDataType
,并且为时间属性单独创建了一种新的 RelDataType
,即 TimeIndicatorRelDataType
:
1
2
3
4
5
6
7
8
9
|
class TimeIndicatorRelDataType(
typeSystem: RelDataTypeSystem,
originalType: BasicSqlType,
val isEventTime: Boolean) //通过 isEventTime 区分是事件时间还是处理时间
extends BasicSqlType(
typeSystem,
originalType.getSqlTypeName,
originalType.getPrecision) {
}
|
定义表的时间属性
有两种方式来定义一张表的时间属性,一种方式是在将 DataStream
转换成 Table
的过程中,另一种方式是直接在 TableSource
的具体实现中定义。
DataStream 转换为 Table
在将 DataStream
转换成一个 Table 的过程中,可以用特殊的表达式来声明时间属性对应的列:
1
2
3
4
5
|
//处理时间
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
//事件时间
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
|
其中 'field.proctime
或这 'field.rowtime
即声明时间属性,表达式会被转换为 PROCTIME
和 ROWTIME
这两个内置函数的调用 UnresolvedCallExpression
:
1
2
3
4
5
6
7
8
9
10
11
12
|
// Time definition
/**
* Declares a field as the rowtime attribute for indicating, accessing, and working in
* Flink's event time.
*/
def rowtime: Expression = unresolvedCall(ROWTIME, expr)
/**
* Declares a field as the proctime attribute for indicating, accessing, and working in
* Flink's processing time.
*/
def proctime: Expression = unresolvedCall(PROCTIME, expr)
|
非 Scala 环境下则通过 ExpressionParser
完成表达式的解析。
使用 TableSource
如果要在 TableSource
中定义时间属性,则需要 TableSource
实现 DefinedProctimeAttribute
或者 DefinedRowAttribute
接口,并且引用的时间属性必须出现在 TableSchema
中,类型为 timestamp 类型。如果要同时使用处理时间和事件时间,对应的 TableSource
需要同时实现这两个接口:
1
2
3
4
5
6
7
8
|
public interface DefinedProctimeAttribute {
@Nullable
String getProctimeAttribute();
}
public interface DefinedRowtimeAttributes {
List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
|
其中 RowtimeAttributeDescriptor
是对事件时间的描述,包括如何提取事件时间,以及 watermark 的生成策略等:
1
2
3
4
5
|
public final class RowtimeAttributeDescriptor {
private final String attributeName; //时间属性名称
private final TimestampExtractor timestampExtractor; //如何提取事件时间
private final WatermarkStrategy watermarkStrategy; //如何生成 watermark
}
|
尽管返回值是 List<RowtimeAttributeDescriptor>
,但目前 Flink SQL 只支持单个事件时间属性。
SQL 引擎中时间属性的转换
更新 TableSchema
在将 DataStream
转换成一个 Table
的过程中,首先需要生成表结构。Table
的底层对应的是一个 QueryOperation
,在这里就是 ScalaDataStreamQueryOperation
(或者 JavaDataStreamQueryOperation
,对应 Java API)。QueryOperation
提供了 TableSchema
和字段映射关系:
1
2
3
4
5
|
public class ScalaDataStreamQueryOperation<E> implements QueryOperation {
private final DataStream<E> dataStream;
private final int[] fieldIndices; //字段索引映射关系
private final TableSchema tableSchema; //表结构
}
|
获得 TableSchema
的逻辑主要被封装在 FieldInfoUtils.getFieldsInfo
方法中,主要是通过解析 Expression
获得表结构中每一列对应的字段在 DataStream
中元素的索引,并得到对应字段的类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
/**
* Utility methods for extracting names and indices of fields from different {@link TypeInformation}s.
*/
public class FieldInfoUtils {
private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<FieldInfo> {
@Override
public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
return createFieldInfo(unresolvedReference, null);
}
@Override
public FieldInfo visit(UnresolvedCallExpression unresolvedCall) {
if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
return visitAlias(unresolvedCall);
} else if (isRowTimeExpression(unresolvedCall)) {
return createRowtimeFieldInfo(unresolvedCall, null);
} else if (isProcTimeExpression(unresolvedCall)) {
return createProctimeFieldInfo(unresolvedCall, null);
}
return defaultMethod(unresolvedCall);
}
}
private static boolean isRowTimeExpression(Expression origExpr) {
return origExpr instanceof UnresolvedCallExpression &&
((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.ROWTIME;
}
private static boolean isProcTimeExpression(Expression origExpr) {
return origExpr instanceof UnresolvedCallExpression &&
((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.PROCTIME;
}
private static FieldInfo createTimeAttributeField(
UnresolvedReferenceExpression reference,
TimestampKind kind, //这里的Kind是TimestampKind.PROCTIME或TimestampKind.ROWTIME
@Nullable String alias) {
final int idx;
//对于时间属性,没有对应的索引,用特殊的标识
if (kind == TimestampKind.PROCTIME) {
idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
} else {
idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
}
String originalName = reference.getName();
return new FieldInfo(
alias != null ? alias : originalName,
idx,
createTimeIndicatorType(kind));
}
}
|
由于时间属性对应的表达式是内置函数的调用,因而可以通过判断对应的函数定义识别出来。
时间属性并不对应 DataStream
中元素真实的字段,因此会用特殊的标识来作为索引:
1
2
3
4
|
public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo<Timestamp> {
public static final int ROWTIME_STREAM_MARKER = -1;
public static final int PROCTIME_STREAM_MARKER = -2;
}
|
如果通过 TableSource
注册一张表,首先会通过 TableSourceValidation.validateTableSource()
验证表结构、时间属性等信息,然后会被封装为 ConnectorCatalogTable
,在这里会完成 TableSchema
的更新:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) {
//更新 TableSchema
final TableSchema tableSchema = calculateSourceSchema(source, isBatch);
return new ConnectorCatalogTable<>(source, null, tableSchema, isBatch);
}
private static <T1> TableSchema calculateSourceSchema(TableSource<T1> source, boolean isBatch) {
TableSchema tableSchema = source.getTableSchema();
if (isBatch) {
return tableSchema;
}
DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
String[] fieldNames = tableSchema.getFieldNames();
//检查是否实现了 DefinedRowtimeAttributes 接口
if (source instanceof DefinedRowtimeAttributes) {
updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types);
}
//检查是否实现了 DefinedProctimeAttribute 接口
if (source instanceof DefinedProctimeAttribute) {
updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types);
}
return TableSchema.builder().fields(fieldNames, types).build();
}
private static void updateRowtimeIndicators(
DefinedRowtimeAttributes source,
String[] fieldNames,
DataType[] types) {
List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors()
.stream()
.map(RowtimeAttributeDescriptor::getAttributeName)
.collect(Collectors.toList());
for (int i = 0; i < fieldNames.length; i++) {
if (rowtimeAttributes.contains(fieldNames[i])) {
// bridged to timestamp for compatible flink-planner
types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
.bridgedTo(java.sql.Timestamp.class);
}
}
}
}
|
经过更新后的 TableSchema
,时间属性列的 LogicalType
就是用特殊 TimestampKind
表征的 TimestampType
。
转换到 Calcite
在 DatabaseCalciteSchema
中, Flink SQL 中注册的表被转换成 Calcite
中使用的表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
class DatabaseCalciteSchema extends FlinkSchema {
@Override
public Table getTable(String tableName) {
ObjectPath tablePath = new ObjectPath(databaseName, tableName);
try {
if (!catalog.tableExists(tablePath)) {
return null;
}
CatalogBaseTable table = catalog.getTable(tablePath);
//将 Flink Catalog 中注册的表转换为 Calcite 中的 Table
if (table instanceof QueryOperationCatalogView) {
QueryOperationCatalogView view = (QueryOperationCatalogView) table;
QueryOperation operation = view.getQueryOperation();
if (operation instanceof DataStreamQueryOperation) {
List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName);
((DataStreamQueryOperation) operation).setQualifiedName(qualifiedName);
} else if (operation instanceof RichTableSourceQueryOperation) {
List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName);
((RichTableSourceQueryOperation) operation).setQualifiedName(qualifiedName);
}
return QueryOperationCatalogViewTable.createCalciteTable(view);
} else if (table instanceof ConnectorCatalogTable) {
return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, tablePath);
} else if (table instanceof CatalogTable) {
return convertCatalogTable(tablePath, (CatalogTable) table);
} else {
throw new TableException("Unsupported table type: " + table);
}
} catch (TableNotExistException | CatalogException e) {
// TableNotExistException should never happen, because we are checking it exists
// via catalog.tableExists
throw new TableException(format(
"A failure occurred when accessing table. Table path [%s, %s, %s]",
catalogName,
databaseName,
tableName), e);
}
}
}
|
在 Flink SQL 中定义的表结构 TableSchema
也会经过 FlinkTypeFactory
转换,LogicalType
也会被转换成 Calcite 内部使用的 RelDataType
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/**
* Flink specific type factory that represents the interface between Flink's [[LogicalType]]
* and Calcite's [[RelDataType]].
*/
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
......
val relType = t.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
val timestampType = t.asInstanceOf[TimestampType]
timestampType.getKind match {
//时间戳类型识别出常规的时间戳和时间属性
case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
}
case _ =>
seenTypes.get(t) match {
case Some(retType: RelDataType) => retType
case None =>
val refType = newRelDataType()
seenTypes.put(t, refType)
refType
}
}
.....
}
}
|
经过这一步转换,时间属性被转换成 TimeIndicatorRelDataType
类型。
查询计划转换
在 Calcite 优化查询计划是,会识别特殊语句中的时间属性,并转换成对应的 RelNode。例如,在 FlinkLogicalJoin
中,如果关联条件中包含了时间窗口,就会被转换为 StreamExecWindowJoin
;而不包含时间窗口的 FlinkLogicalJoin
则会被转换为 StreamExecJoin
。其区别就在于对时间属性的识别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
/**
* Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition
* to [[StreamExecWindowJoin]].
*/
class StreamExecWindowJoinRule
extends ConverterRule(
classOf[FlinkLogicalJoin],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
"StreamExecWindowJoinRule") {
override def matches(call: RelOptRuleCall): Boolean = {
......
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join)
//提取时间窗口的边界
val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate(
join.getCondition,
join.getLeft.getRowType.getFieldCount,
joinRowType,
join.getCluster.getRexBuilder,
tableConfig)
if (windowBounds.isDefined) {
//如果识别到时间窗口,该规则匹配
if (windowBounds.get.isEventTime) {
true
} else {
// Check that no event-time attributes are in the input because the processing time window
// join does not correctly hold back watermarks.
// We rely on projection pushdown to remove unused attributes before the join.
!joinRowType.getFieldList.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
}
} else {
// the given join does not have valid window bounds. We cannot translate it.
false
}
}
}
|
窗口聚合也是同理,只是规则匹配中是对 HOP
, TUMBLE
等窗口函数的识别。
物化时间属性
时间属性是一个逻辑上的列,因为它并不真实对应底层 DataStream
元素中具体的字段。因此,如果在计算操作中要用到时间属性列的值,就需要“物化”(materialized)时间属性。这一部分的逻辑主要是在 RelTimeIndicatorConverter
中。
对于 PROCTIME 时间属性,就是将其转换为对内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE
的调用;而对于 ROWTIME 时间属性,则是对其进行一次强制的类型转换,转换为常规的时间戳。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/**
* Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]].
*/
class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {
def materialize(expr: RexNode): RexNode = {
if (isTimeIndicatorType(expr.getType)) {
if (isRowtimeIndicatorType(expr.getType)) {
// cast rowtime indicator to regular timestamp
rexBuilder.makeAbstractCast(timestamp(expr.getType.isNullable), expr)
} else {
// generate proctime access
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
}
} else {
expr
}
}
}
|
生成物理执行计划
SQL 的查询计划最终需要被转换为 Flink 的算子,即生成物理执行计划。对于一个查询计划来说,首先需要进行转换的就是 Scan 操作,在里将底层的 POJO、Tuple 等对象映射为 Table 中使用的 Row。在这里就需要考虑到时间属性的映射。
首先,我们来看下在 StreamExecDataStreamScan
中是如何完成字段映射的转换的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
class StreamExecDataStreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
outputRowType: RelDataType)
extends TableScan(cluster, traitSet, table) {
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
val transform = inputDataStream.getTransformation
val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder)
// when there is row time extraction expression, we need internal conversion
// when the physical type of the input date stream is not BaseRow, we need internal conversion.
if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) {
// extract time if the index is -1 or -2.
val (extractElement, resetElement) =
if (ScanUtil.hasTimeAttributeField(dataStreamTable.fieldIndexes)) {
(s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
} else {
("", "")
}
val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
classOf[AbstractProcessStreamOperator[BaseRow]])
//生成字段映射的代码
ScanUtil.convertToInternalRow(
ctx,
transform,
dataStreamTable.fieldIndexes, //字段索引
dataStreamTable.dataType,
getRowType,
getTable.getQualifiedName,
config,
rowtimeExpr, //RowTime 表达式
beforeConvert = extractElement,
afterConvert = resetElement)
} else {
transform.asInstanceOf[Transformation[BaseRow]]
}
}
//获取 RowTime 时间属性的表达式
private def getRowtimeExpression(relBuilder: FlinkRelBuilder): Option[RexNode] = {
val fieldIdxs = dataStreamTable.fieldIndexes
if (!fieldIdxs.contains(ROWTIME_STREAM_MARKER)) {
None
} else {
//根据字段映射中的特殊marker来查找 rowtime 字段
val rowtimeField = dataStreamTable.fieldNames(fieldIdxs.indexOf(ROWTIME_STREAM_MARKER))
// get expression to extract timestamp
fromDataTypeToLogicalType(dataStreamTable.dataType) match {
//如果这个datastream本身已经完成了 rowtime 时间属性的提取
case dataType: RowType
if dataType.getFieldNames.contains(rowtimeField) &&
TypeCheckUtils.isRowTime(dataType.getTypeAt(dataType.getFieldIndex(rowtimeField))) =>
// if rowtimeField already existed in the data stream, use the default rowtime
None
case _ =>
// 用内置函数 StreamRecordTimestampSqlFunction 来提取 rowtime 时间属性
// extract timestamp from StreamRecord
Some(
relBuilder.cast(
relBuilder.call(new StreamRecordTimestampSqlFunction),
relBuilder.getTypeFactory.createFieldTypeFromLogicalType(
new TimestampType(true, TimestampKind.ROWTIME, 3)).getSqlTypeName))
}
}
}
}
|
在进行字段映射的时候,一个关键的处理就是时间属性要怎么映射。ROWTIME 时间属性被转换为 StreamRecordTimestampSqlFunction
的调用。在前面物化时间属性的阶段,我们已经看到,PROCTIME 时间属性已经被转换为内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE
的调用,而对于 ROWTIME 时间属性,只是进行了一次强制的类型转换。这主要是因为,PROCTIME 在流处理中,在不同的算子中,每一次调用都应该获取当前的系统时间;而对于 ROWTIME 而言,它的取值是固定的,因此只需要在最开始完成一次转换即可。RPOCTIME 和 ROWTIME 的取值最终都被转换成对内置函数的调用。
这个函数调用的代码生成就比较简单了,RPOCTIME 转换成 context.timerService().currentProcessingTime()
,ROWTIME 转换成 context.timestamp()
,具体的代码生成可以参考 GenerateUtils
。
由于 PROCTIME 并不需要在 Scan 节点进行物化,因此在这里直接用 null 值替代,在后续需要的时候重新进行计算。
而对于从 TableSource
注册而来的表,由于它不像 DataStream
那样,在定义 ROWTIME 时间属性之前已经完成了 timestamp 和 watermark 的指定。因此在转换 TableSource
的过程中,如果定义了 ROWTIME 时间属性,除了需要提取 ROWTIME 时间属性的值以外,还需要指定 watermark。
RowtimeAttributeDescriptor
是对 ROWTIME 时间属性的描述,包括如何提取事件时间的 TimestampExtractor
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/**
* The {@link FieldComputer} interface returns an expression to compute the field of the table
* schema of a {@link TableSource} from one or more fields of the {@link TableSource}'s return type.
*
* @param <T> The result type of the provided expression.
*/
public interface FieldComputer<T> {
String[] getArgumentFields();
TypeInformation<T> getReturnType();
void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes);
/**
* Returns the {@link Expression} that computes the value of the field.
*
* @param fieldAccesses Field access expressions for the argument fields.
* @return The expression to extract the timestamp from the {@link TableSource} return type.
*/
Expression getExpression(ResolvedFieldReference[] fieldAccesses);
}
//Provides an expression to extract the timestamp for a rowtime attribute.
public abstract class TimestampExtractor implements FieldComputer<Long>, Serializable, Descriptor {
@Override
public TypeInformation<Long> getReturnType() {
return Types.LONG; //返回类型是 Long
}
}
|
通常提取 ROWTIME 的方式是根据已有的字段来生成,即:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public final class ExistingField extends TimestampExtractor {
private String field;
/**
* Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or
* timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000")
* into a rowtime attribute.
*/
@Override
public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
ResolvedFieldReference fieldAccess = fieldAccesses[0];
DataType type = fromLegacyInfoToDataType(fieldAccess.resultType());
//字段引用的表达式
FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression(
fieldAccess.name(),
type,
0,
fieldAccess.fieldIndex());
//支持的输入字段类型,包括 BIGINT、TIMESTAMP_WITHOUT_TIME_ZONE 和 VARCHAR
switch (type.getLogicalType().getTypeRoot()) {
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
//直接引用相应的字段即可
return fieldReferenceExpr;
case VARCHAR:
//进行一次类型转换
DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
return new CallExpression(
CAST,
Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
outputType);
default:
throw new RuntimeException("Unsupport type: " + type);
}
}
}
|
TimestampExtractor
提供的提取时间属性的 Expression
, 要先被转换为 RexNode
之后才可以被用在代码生成中,通过 TableSourceUtil.getRowtimeExtractionExpression
完成。如同前面提到对 StreamRecordTimestampSqlFunction
调用,这两种获得时间属性的的方式本质上是一样的,只是调用的函数不一致。
在完成了字段映射和时间属性的提取后,还需要指定 watermark,这正是 RowtimeAttributeDescriptor
中 WatermarkStrategy
的用处:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
class StreamExecTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
relOptTable: FlinkRelOptTable)
extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) {
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
val inputTransform = getSourceTransformation(planner.getExecEnv)
//1. 字段映射和时间提取
......
//2. 指定watermark
val withWatermarks = if (rowtimeDesc.isDefined) {
val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName)
val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
watermarkStrategy match {
case p: PeriodicWatermarkAssigner =>
val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case p: PunctuatedWatermarkAssigner =>
val watermarkGenerator =
new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case _: PreserveWatermarks =>
// The watermarks have already been provided by the underlying DataStream.
ingestedTable
}
} else {
// No need to generate watermarks if no rowtime attribute is specified.
ingestedTable
}
withWatermarks.getTransformation
}
}
|
到这里,就完成了 ROWTIME 时间属性的提取的 watermark 的生成。 PROCTIME 在使用时按需要进行物化。
小结
时间属性是广泛应用在窗口操作中,是流式 SQL 处理中非常重要的概念。本文对 Flink SQL 中时间属性的使用方法和具体实现进行了介绍。
参考
-EOF-