四时宝库

程序员的知识宝库

FlinkTable时间属性(flink tablesink)

像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

一、处理时间

1. DataStream 到 Table 转换时定义

处理时间属性可以在schema定义的时候用.proctime后缀来定义。(处理)时间属性一定不能定义在一个已有字段上,所以它只能定义在schema定义的最后。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

DataStreamSource<WaterSensor> waterSensorStream =

env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60));

// 1. 创建表的执行环境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 声明一个额外的字段来作为处理时间字段

Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());

sensorTable.execute().print();

2. 在创建表的 DDL 中定义

package com.bigdata.flink.java.chapter_11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.TableResult;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink06_TableApi_ProcessTime {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 1. 创建表的执行环境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建表, 声明一个额外的列作为处理时间

tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("

+ "'connector' = 'filesystem',"

+ "'path' = 'input/sensor.txt',"

+ "'format' = 'csv'"

+ ")");

TableResult result = tableEnv.executeSql("select * from sensor");

result.print();

}

}

二、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。

为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

1. DataStream 到 Table 转换时定义

事件时间属性可以用.rowtime后缀在定义DataStream schema 的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了。

在从DataStream到Table转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

在 schema 的结尾追加一个新的字段。

替换一个已经存在的字段。

不管在哪种情况下,事件时间字段都表示DataStream中定义的事件的时间戳。

package com.bigdata.flink.java.chapter_11;

import com.bigdata.flink.java.chapter_5.WaterSensor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class Flink07_TableApi_EventTime {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator<WaterSensor> waterSensorStream = env

.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTs())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

// 用一个额外的字段作为事件时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("et").rowtime());

table.execute().print();

env.execute();

}

}

// 使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));;

2. 在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

package com.bigdata.flink.java.chapter_11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_TableApi_EventTime_2 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 作为事件时间的字段必须是 timestamp(3) 类型, 所以根据 long 类型的 ts 计算出来一个 t

tEnv.executeSql("create table sensor(" +

"id string," +

"ts bigint," +

"vc int, " +

"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +

"watermark for t as t - interval '5' second)" +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'input/sensor.txt',"

+ "'format' = 'csv'"

+ ")");

tEnv.sqlQuery("select * from sensor").execute().print();

}

}

说明:

1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

2.严格递增时间戳:WATERMARK FOR rowtime_column AS rowtime_column。

3.递增时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。

4.乱序时间戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。

5.当发现时区所导致的时间问题时,可设置本地使用的时区:

Configuration configuration = tableEnv.getConfig().getConfiguration();

configuration.setString("table.local-time-zone", "GMT");

总结

Flink处理时间属性可以在schema定义的时候用.proctime后缀来定义。(处理)时间属性一定不能定义在一个已有字段上,所以它只能定义在schema定义的最后。Flink事件时间属性可以用.rowtime后缀在定义DataStream schema 的时候来定义。时间戳和watermark在这之前一定是在DataStream上已经定义好了。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接