diff --git a/docs/content.zh/docs/dev/table/concepts/time_attributes.md b/docs/content.zh/docs/dev/table/concepts/time_attributes.md index e527420d2220b..aabd41f1ffc30 100644 --- a/docs/content.zh/docs/dev/table/concepts/time_attributes.md +++ b/docs/content.zh/docs/dev/table/concepts/time_attributes.md @@ -28,7 +28,7 @@ under the License. Flink 可以基于几种不同的 *时间* 概念来处理数据。 -- *处理时间* 指的是执行具体操作时的机器时间(也称作"挂钟时间") +- *处理时间* 指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的 `System.currentTimeMillis()`) ) - *事件时间* 指的是数据本身携带的时间。这个时间是在事件产生时的时间。 - *摄入时间* 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。 @@ -94,7 +94,7 @@ env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) # default ### 在创建表的 DDL 中定义 -处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 `PROCTIME()` 就可以定义处理时间。关于计算列,更多信息可以参考:[CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) +处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 `PROCTIME()` 就可以定义处理时间,函数 `PROCTIME()` 的返回类型是 TIMESTAMP_LTZ 。关于计算列,更多信息可以参考:[CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) ```sql @@ -233,6 +233,10 @@ val windowedTable = tEnv 事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。更多信息可以参考:[CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) +Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. +If the data source contains timestamp literal, it's recommended to defining event time attribute on TIMESTAMP column: + +Flink 支持和在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间。如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 `2020-04-15 20:13:40.564`,建议将事件时间属性定义在 `TIMESTAMP` 列上: ```sql CREATE TABLE user_actions ( @@ -251,10 +255,30 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); ``` +源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 `1618989564564`,建议将事件时间属性定义在 `TIMESTAMP_LTZ` 列上: + ```sql + +CREATE TABLE user_actions ( + user_name STRING, + data STRING, + ts BIGINT, + time_ltz AS TO_TIMESTAMP_LTZ(time_ltz, 3), + -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy + WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND +) WITH ( + ... +); + +SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) +FROM user_actions +GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); + +``` ### 在 DataStream 到 Table 转换时定义 -事件时间属性可以用 `.rowtime` 后缀在定义 `DataStream` schema 的时候来定义。[时间戳和 watermark]({{< ref "docs/concepts/time" >}}) 在这之前一定是在 `DataStream` 上已经定义好了。 +事件时间属性可以用 `.rowtime` 后缀在定义 `DataStream` schema 的时候来定义。[时间戳和 watermark]({{< ref "docs/concepts/time" >}}) 在这之前一定是在 `DataStream` 上已经定义好了。 +在从 DataStream 转换到 Table 时,由于 `DataStream` 没有时区概念,因此 Flink 总是将 `rowtime` 属性解析成 `TIMESTAMP WITHOUT TIME ZONE` 类型,并且将所有事件时间的值都视为 UTC 时区的值。 在从 `DataStream` 到 `Table` 转换时定义事件时间属性有两种方式。取决于用 `.rowtime` 后缀修饰的字段名字是否是已有字段,事件时间字段可以是: diff --git a/docs/content.zh/docs/dev/table/concepts/timezone.md b/docs/content.zh/docs/dev/table/concepts/timezone.md new file mode 100644 index 0000000000000..f2b19941a7574 --- /dev/null +++ b/docs/content.zh/docs/dev/table/concepts/timezone.md @@ -0,0 +1,568 @@ +--- +title: "时间类型和时区支持" +weight: 4 +type: docs +aliases: + - /zh/dev/table/streaming/versioned_tables.html + - /zh/dev/table/streaming/temporal_tables.html +--- + + +## Overview + +Flink provides rich data types for Date and Time, including `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ`, `INTERVAL YEAR TO MONTH`, `INTERVAL DAY TO SECOND` (please see [Date and Time]({{< ref "docs/dev/table/types" >}}#date-and-time) for detailed information). +Flink supports setting time zone in session level (please see [table.local-time-zone]({{< ref "docs/dev/table/config">}}#table-local-time-zone) for detailed information). +These timestamp data types and time zone support of Flink make it easy to process business data across time zones. + +## TIMESTAMP vs. TIMESTAMP_LTZ + +### TIMESTAMP type + - `TIMESTAMP(p)` is an abbreviation for `TIMESTAMP(p) WITHOUT TIME ZONE`, the precision `p` supports range is from 0 to 9, 6 by default. + - `TIMESTAMP` describes a timestamp represents year, month, day, hour, minute, second and fractional seconds. + - `TIMESTAMP` can be specified from a string literal, e.g. + ```sql +Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001'; ++-------------------------+ +| 1970-01-01 00:00:04.001 | ++-------------------------+ +``` + +### TIMESTAMP_LTZ type + - `TIMESTAMP_LTZ(p)` is an abbreviation for `TIMESTAMP(p) WITH LOCAL TIME ZONE`, the precision `p` supports range is from 0 to 9, 6 by default. + - `TIMESTAMP_LTZ` describes an absolute time point on the time-line, it stores a long value representing epoch-milliseconds and an int representing nanosecond-of-millisecond. The epoch time is measured from the standard Java epoch of `1970-01-01T00:00:00Z`. Every datum of `TIMESTAMP_LTZ` type is interpreted in the local time zone configured in the current session for computation and visualization. + - `TIMESTAMP_LTZ` has no literal representation and thus can not specify from literal, it can derives from a long epoch time(e.g. The long time produced by Java `System.currentTimeMillis()`) + + ```sql +Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3); +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM T1; ++---------------------------+ +| TO_TIMESTAMP_LTZ(4001, 3) | ++---------------------------+ +| 1970-01-01 00:00:04.001 | ++---------------------------+ + +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM T1; ++---------------------------+ +| TO_TIMESTAMP_LTZ(4001, 3) | ++---------------------------+ +| 1970-01-01 08:00:04.001 | ++---------------------------+ +``` + +- `TIMESTAMP_LTZ` can be used in cross time zones business because the absolute time point (e.g. above `4001` milliseconds) describes a same instantaneous point in different time zones. +Giving a background that at a same time point, the `System.currentTimeMillis()` of all machines in the world returns same value (e.g. the `4001` milliseconds in above example), this is absolute time point meaning. + +## Time Zone Usage +The local time zone defines current session time zone id. You can config the time zone in Sql Client or Applications. + +{{< tabs "SQL snippets" >}} +{{< tab "SQL Client" >}} +```sql +# set to UTC time zone +Flink SQL> SET table.local-time-zone=UTC; + +# set to Shanghai time zone +Flink SQL> SET table.local-time-zone=Asia/Shanghai; + +# set to Los_Angeles time zone +Flink SQL> SET table.local-time-zone=America/Los_Angeles; +``` +{{< /tab >}} +{{< tab "Java" >}} +```java + EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build(); + TableEnvironment tEnv = TableEnvironment.create(envSetting); + + // set to UTC time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC")); + +// set to Shanghai time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + +// set to Los_Angeles time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val envSetting = EnvironmentSettings.newInstance.build +val tEnv = TableEnvironment.create(envSetting) + +// set to UTC time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC")) + +// set to Shanghai time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) + +// set to Los_Angeles time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles")) +``` +{{< /tab >}} +{{< /tabs >}} + +The session time zone is useful in Flink SQL, the main usages are: +### Decide time functions return value +The following time functions is influenced by the configured time zone. +* LOCALTIME +* LOCALTIMESTAMP +* CURRENT_DATE +* CURRENT_TIME +* CURRENT_TIMESTAMP +* CURRENT_ROW_TIMESTAMP() +* NOW() +* PROCTIME() + + +```sql +Flink SQL> SET sql-client.execution.result-mode=tableau; +Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME(); +Flink SQL> DESC MyView1; +``` + +``` ++------------------------+-----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++------------------------+-----------------------------+-------+-----+--------+-----------+ +| LOCALTIME | TIME(0) | false | | | | +| LOCALTIMESTAMP | TIMESTAMP(3) | false | | | | +| CURRENT_DATE | DATE | false | | | | +| CURRENT_TIME | TIME(0) | false | | | | +| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | | +|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | | +| NOW() | TIMESTAMP_LTZ(3) | false | | | | +| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | ++------------------------+-----------------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView1; +``` + +``` ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView1; +``` + +``` ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +``` + +### String representation of `TIMESTAMP_LTZ` data value +The session timezone is used when represents a `TIMESTAMP_LTZ` value to string format, i.e print the value, cast the value to `STRING` type, cast the value to `TIMESTAMP`, cast a `TIMESTAMP` value to `TIMESTAMP_LTZ`: +```sql +Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz; +Flink SQL> DESC MyView2; +``` + +``` ++------+------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++------+------------------+-------+-----+--------+-----------+ +| ltz | TIMESTAMP_LTZ(3) | true | | | | +| ntz | TIMESTAMP(3) | false | | | | ++------+------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView2; +``` + +``` ++-------------------------+-------------------------+ +| ltz | ntz | ++-------------------------+-------------------------+ +| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 | ++-------------------------+-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView2; +``` + +``` ++-------------------------+-------------------------+ +| ltz | ntz | ++-------------------------+-------------------------+ +| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | ++-------------------------+-------------------------+ +``` + +```sql +Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2; +``` + +``` +Flink SQL> DESC MyView3; ++-------------------------------+------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++-------------------------------+------------------+-------+-----+--------+-----------+ +| ltz | TIMESTAMP_LTZ(3) | true | | | | +| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | | +| CAST(ltz AS STRING) | STRING | true | | | | +| ntz | TIMESTAMP(3) | false | | | | +| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | | ++-------------------------------+------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SELECT * FROM MyView3; +``` + +``` ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) | ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 | ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +``` + +## Time Attribute and Time Zone + +Please see [时间属性]({{< ref "docs/dev/table/concepts/time_attributes">}}#时间属性) for more information about time attribute. + +### Processing Time Attribute and Time Zone +Flink SQL defines process time attribute by function `PROCTIME()`, the function return type is `TIMESTAMP_LTZ`. + +{{< hint info >}} +Before Flink 1.13, the function return type of `PROCTIME()` is `TIMESTAMP`, and the return value is the `TIMESTAMP` in UTC time zone, +e.g. the wall-clock shows `2021-03-01 12:00:00` at Shanghai, however the `PROCTIME()` displays `2021-03-01 04:00:00` which is wrong. +Flin 1.13 fixes this issue and uses `TIMESTAMP_LTZ` type as return type of `PROCTIME()`, users don't need to deal time zone problems anymore. +{{< /hint >}} + +The PROCTIME() always represents your local timestamp value, using TIMESTAMP_LTZ type can also support DayLight Saving Time well. + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT PROCTIME(); +``` +``` ++-------------------------+ +| PROCTIME() | ++-------------------------+ +| 2021-04-15 14:48:31.387 | ++-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT PROCTIME(); +``` +``` ++-------------------------+ +| PROCTIME() | ++-------------------------+ +| 2021-04-15 22:48:31.387 | ++-------------------------+ +``` + +```sql +Flink SQL> CREATE TABLE MyTable1 ( + item STRING, + price DOUBLE, + proctime as PROCTIME() + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView3 AS + SELECT + TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime, + item, + MAX(price) as max_price + FROM MyTable1 + GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView3; +``` + +``` ++-----------------+-----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++-----------------+-----------------------------+-------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | false | | | | +| window_end | TIMESTAMP(3) | false | | | | +| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++-----------------+-----------------------------+-------+-----+--------+-----------+ +``` + +The input data of MyTable1 is: +``` +A,1.1 +B,1.2 +A,1.8 +B,2.5 +C,3.8 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView3; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_procime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView3; +``` + +Returns the different window start, window end and window proctime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_procime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +### Event Time Attribute and Time Zone +Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. + +#### Event Time Attribute on TIMESTAMP +If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the event time attribute as a `TIMESTAMP` column: +```sql +Flink SQL> CREATE TABLE MyTable2 ( + item STRING, + price DOUBLE, + ts TIMESTAMP(3), -- TIMESTAMP data type + WATERMARK FOR ts AS ts - INTERVAL '10' SECOND + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView4 AS + SELECT + TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime, + item, + MAX(price) as max_price + FROM MyTable2 + GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView4; +``` + +``` ++----------------+------------------------+------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+------------------------+------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | true | | | | +| window_end | TIMESTAMP(3) | true | | | | +| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++----------------+------------------------+------+-----+--------+-----------+ +``` + +The input data of MyTable2 is: +``` +A,1.1,2021-04-15 14:01:00 +B,1.2,2021-04-15 14:02:00 +A,1.8,2021-04-15 14:03:00 +B,2.5,2021-04-15 14:04:00 +C,3.8,2021-04-15 14:05:00 +C,3.8,2021-04-15 14:11:00 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView4; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView4; +``` + +Returns the same window start, window end and window rowtime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +#### Event Time Attribute on TIMESTAMP_LTZ +If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. `1618989564564`, it's recommended to define event time attribute as a `TIMESTAMP_LTZ` column. +```sql +Flink SQL> CREATE TABLE MyTable3 ( + item STRING, + price DOUBLE, + ts BIGINT, -- long time value in epoch milliseconds + ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), + WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView5 AS + SELECT + TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime, + item, + MAX(price) as max_price + FROM MyTable3 + GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView5; +``` + +``` ++----------------+----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+----------------------------+-------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | false | | | | +| window_end | TIMESTAMP(3) | false | | | | +| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++----------------+----------------------------+-------+-----+--------+-----------+ +``` + +The input data of MyTable3 is: +``` +A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00 +B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00 +A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00 +B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00 +C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00 +C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView5; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView5; +``` + +Returns the different window start, window end and window rowtime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +## Daylight Saving Time Support +Flink SQL supports defining time attributes on TIMESTAMP_LTZ column, base on this, Flink SQL gracefully uses TIMESTAMP and TIMESTAMP_LTZ type in window processing to support the Daylight Saving Time. + + +Flink use timestamp literal to split the window and assigns window to data according to the epoch time of the each row. It means Flink uses `TIMESTAMP` type for window start and window end (e.g. `TUMBLE_START` and `TUMBLE_END`), uses `TIMESTAMP_LTZ` for window time attribute (e.g. `TUMBLE_PROCTIME`, `TUMBLE_ROWTIME`). +Given a example of tumble window, the DaylightTime in Los_Angele starts at time 2021-03-14 02:00:00: +``` +long epoch1 = 1615708800000L; // 2021-03-14 00:00:00 +long epoch2 = 1615712400000L; // 2021-03-14 01:00:00 +long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour (2021-03-14 02:00:00) +long epoch4 = 1615719600000L; // 2021-03-14 04:00:00 +``` +The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] will collect 3 hours' data in Los_angele time zone, but it collect 4 hours' data in other non-DST time zones, what user to do is only define time attribute on TIMESTAMP_LTZ column. + +All windows in Flink like Hop window, Session window, Cumulative window follow this way, and all operations in Flink SQL support TIMESTAMP_LTZ well, thus Flink gracefully supports the Daylight Saving Time zone.   + + +## Difference between Batch and Streaming Mode +The following time functions: +* LOCALTIME +* LOCALTIMESTAMP +* CURRENT_DATE +* CURRENT_TIME +* CURRENT_TIMESTAMP +* NOW() + +Flink evaluates their values according to execution mode. They are evaluated for each record in streaming mode. But in batch mode, they are evaluated once as the query starts and uses the same result for every row. + +The following time functions are evaluated for each record no matter in batch or streaming mode: + +* CURRENT_ROW_TIMESTAMP() +* PROCTIME() + +{{< top >}} diff --git a/docs/content/docs/dev/table/concepts/time_attributes.md b/docs/content/docs/dev/table/concepts/time_attributes.md index c657a788349f9..55e303acefc57 100644 --- a/docs/content/docs/dev/table/concepts/time_attributes.md +++ b/docs/content/docs/dev/table/concepts/time_attributes.md @@ -28,7 +28,7 @@ under the License. Flink can process data based on different notions of time. -- *Processing time* refers to the machine's system time (also known as "wall-clock time") that is executing the respective operation. +- *Processing time* refers to the machine's system time (also known as epoch time, e.g. Java's `System.currentTimeMillis()`) that is executing the respective operation. - *Event time* refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened. For more information about time handling in Flink, see the introduction about [event time and watermarks]({{< ref "docs/concepts/time" >}}). @@ -59,6 +59,8 @@ Event time attributes can be defined in `CREATE` table DDL or during DataStream- The event time attribute is defined using a `WATERMARK` statement in `CREATE` table DDL. A watermark statement defines a watermark generation expression on an existing event time field, which marks the event time field as the event time attribute. Please see [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more information about watermark statement and watermark strategies. +Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. +If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the event time attribute as a `TIMESTAMP` column:: ```sql CREATE TABLE user_actions ( @@ -77,10 +79,29 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); ``` +If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. `1618989564564`, it's recommended to define event time attribute as a `TIMESTAMP_LTZ` column: +```sql + +CREATE TABLE user_actions ( + user_name STRING, + data STRING, + ts BIGINT, + time_ltz AS TO_TIMESTAMP_LTZ(time_ltz, 3), + -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy + WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND +) WITH ( + ... +); + +SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) +FROM user_actions +GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); + +``` ### During DataStream-to-Table Conversion -When converting a `DataStream` to a table, an event time attribute can be defined with the `.rowtime` property during schema definition. [Timestamps and watermarks]({{< ref "docs/concepts/time" >}}) must have already been assigned in the `DataStream` being converted. +When converting a `DataStream` to a table, an event time attribute can be defined with the `.rowtime` property during schema definition. [Timestamps and watermarks]({{< ref "docs/concepts/time" >}}) must have already been assigned in the `DataStream` being converted. During the conversion, Flink always derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream doesn't have time zone notion, and treats all event time values as in UTC. There are two ways of defining the time attribute when converting a `DataStream` into a `Table`. Depending on whether the specified `.rowtime` field name exists in the schema of the `DataStream`, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column. @@ -155,7 +176,7 @@ There are three ways to define a processing time attribute. ### Defining in DDL -The processing time attribute is defined as a computed column in `CREATE` table DDL using the system `PROCTIME()` function. Please see [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more information about computed column. +The processing time attribute is defined as a computed column in `CREATE` table DDL using the system `PROCTIME()` function, the function return type is TIMESTAMP_LTZ. Please see [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more information about computed column. ```sql diff --git a/docs/content/docs/dev/table/concepts/timezone.md b/docs/content/docs/dev/table/concepts/timezone.md new file mode 100644 index 0000000000000..c2fe238633746 --- /dev/null +++ b/docs/content/docs/dev/table/concepts/timezone.md @@ -0,0 +1,570 @@ +--- +title: "Time Zone" +weight: 4 +type: docs +aliases: + - /dev/table/streaming/versioned_tables.html + - /dev/table/streaming/temporal_tables.html +--- + + +## Overview + +Flink provides rich data types for Date and Time, including `DATE`, `TIME`, `TIMESTAMP`, `TIMESTAMP_LTZ`, `INTERVAL YEAR TO MONTH`, `INTERVAL DAY TO SECOND` (please see [Date and Time]({{< ref "docs/dev/table/types" >}}#date-and-time) for detailed information). +Flink supports setting time zone in session level (please see [table.local-time-zone]({{< ref "docs/dev/table/config">}}#table-local-time-zone) for detailed information). +These timestamp data types and time zone support of Flink make it easy to process business data across time zones. + +## TIMESTAMP vs. TIMESTAMP_LTZ + +### TIMESTAMP type + - `TIMESTAMP(p)` is an abbreviation for `TIMESTAMP(p) WITHOUT TIME ZONE`, the precision `p` supports range is from 0 to 9, 6 by default. + - `TIMESTAMP` describes a timestamp represents year, month, day, hour, minute, second and fractional seconds. + - `TIMESTAMP` can be specified from a string literal, e.g. + ```sql +Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001'; ++-------------------------+ +| 1970-01-01 00:00:04.001 | ++-------------------------+ +``` + +### TIMESTAMP_LTZ type + - `TIMESTAMP_LTZ(p)` is an abbreviation for `TIMESTAMP(p) WITH LOCAL TIME ZONE`, the precision `p` supports range is from 0 to 9, 6 by default. + - `TIMESTAMP_LTZ` describes an absolute time point on the time-line, it stores a long value representing epoch-milliseconds and an int representing nanosecond-of-millisecond. The epoch time is measured from the standard Java epoch of `1970-01-01T00:00:00Z`. Every datum of `TIMESTAMP_LTZ` type is interpreted in the local time zone configured in the current session for computation and visualization. + - `TIMESTAMP_LTZ` has no literal representation and thus can not specify from literal, it can derives from a long epoch time(e.g. The long time produced by Java `System.currentTimeMillis()`) + + ```sql +Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3); +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM T1; ++---------------------------+ +| TO_TIMESTAMP_LTZ(4001, 3) | ++---------------------------+ +| 1970-01-01 00:00:04.001 | ++---------------------------+ + +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM T1; ++---------------------------+ +| TO_TIMESTAMP_LTZ(4001, 3) | ++---------------------------+ +| 1970-01-01 08:00:04.001 | ++---------------------------+ +``` + +- `TIMESTAMP_LTZ` can be used in cross time zones business because the absolute time point (e.g. above `4001` milliseconds) describes a same instantaneous point in different time zones. +Giving a background that at a same time point, the `System.currentTimeMillis()` of all machines in the world returns same value (e.g. the `4001` milliseconds in above example), this is absolute time point meaning. + +## Time Zone Usage +The local time zone defines current session time zone id. You can config the time zone in Sql Client or Applications. + +{{< tabs "SQL snippets" >}} +{{< tab "SQL Client" >}} +```sql +# set to UTC time zone +Flink SQL> SET table.local-time-zone=UTC; + +# set to Shanghai time zone +Flink SQL> SET table.local-time-zone=Asia/Shanghai; + +# set to Los_Angeles time zone +Flink SQL> SET table.local-time-zone=America/Los_Angeles; +``` +{{< /tab >}} +{{< tab "Java" >}} +```java + EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build(); + TableEnvironment tEnv = TableEnvironment.create(envSetting); + + // set to UTC time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC")); + +// set to Shanghai time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); + +// set to Los_Angeles time zone + tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles")); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val envSetting = EnvironmentSettings.newInstance.build +val tEnv = TableEnvironment.create(envSetting) + +// set to UTC time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC")) + +// set to Shanghai time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) + +// set to Los_Angeles time zone +tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles")) +``` +{{< /tab >}} +{{< /tabs >}} + +The session time zone is useful in Flink SQL, the main usages are: + +### Decide time functions return value +The following time functions is influenced by the configured time zone. +* LOCALTIME +* LOCALTIMESTAMP +* CURRENT_DATE +* CURRENT_TIME +* CURRENT_TIMESTAMP +* CURRENT_ROW_TIMESTAMP() +* NOW() +* PROCTIME() + + +```sql +Flink SQL> SET sql-client.execution.result-mode=tableau; +Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME(); +Flink SQL> DESC MyView1; +``` + +``` ++------------------------+-----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++------------------------+-----------------------------+-------+-----+--------+-----------+ +| LOCALTIME | TIME(0) | false | | | | +| LOCALTIMESTAMP | TIMESTAMP(3) | false | | | | +| CURRENT_DATE | DATE | false | | | | +| CURRENT_TIME | TIME(0) | false | | | | +| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | | +|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | | +| NOW() | TIMESTAMP_LTZ(3) | false | | | | +| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | ++------------------------+-----------------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView1; +``` + +``` ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView1; +``` + +``` ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | ++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+ +``` + +### String representation of `TIMESTAMP_LTZ` data value +The session timezone is used when represents a `TIMESTAMP_LTZ` value to string format, i.e print the value, cast the value to `STRING` type, cast the value to `TIMESTAMP`, cast a `TIMESTAMP` value to `TIMESTAMP_LTZ`: +```sql +Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz; +Flink SQL> DESC MyView2; +``` + +``` ++------+------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++------+------------------+-------+-----+--------+-----------+ +| ltz | TIMESTAMP_LTZ(3) | true | | | | +| ntz | TIMESTAMP(3) | false | | | | ++------+------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView2; +``` + +``` ++-------------------------+-------------------------+ +| ltz | ntz | ++-------------------------+-------------------------+ +| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 | ++-------------------------+-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView2; +``` + +``` ++-------------------------+-------------------------+ +| ltz | ntz | ++-------------------------+-------------------------+ +| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | ++-------------------------+-------------------------+ +``` + +```sql +Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2; +``` + +``` +Flink SQL> DESC MyView3; ++-------------------------------+------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++-------------------------------+------------------+-------+-----+--------+-----------+ +| ltz | TIMESTAMP_LTZ(3) | true | | | | +| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | | +| CAST(ltz AS STRING) | STRING | true | | | | +| ntz | TIMESTAMP(3) | false | | | | +| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | | ++-------------------------------+------------------+-------+-----+--------+-----------+ +``` + +```sql +Flink SQL> SELECT * FROM MyView3; +``` + +``` ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) | ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 | ++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+ +``` + +## Time Attribute and Time Zone + +Please see [Time Attribute]({{< ref "docs/dev/table/concepts/time_attributes">}}#time-attributes) for more information about time attribute. + +### Processing Time Attribute and Time Zone +Flink SQL defines process time attribute by function `PROCTIME()`, the function return type is `TIMESTAMP_LTZ`. + +{{< hint info >}} +Before Flink 1.13, the function return type of `PROCTIME()` is `TIMESTAMP`, and the return value is the `TIMESTAMP` in UTC time zone, +e.g. the wall-clock shows `2021-03-01 12:00:00` at Shanghai, however the `PROCTIME()` displays `2021-03-01 04:00:00` which is wrong. +Flin 1.13 fixes this issue and uses `TIMESTAMP_LTZ` type as return type of `PROCTIME()`, users don't need to deal time zone problems anymore. +{{< /hint >}} + +The PROCTIME() always represents your local timestamp value, using TIMESTAMP_LTZ type can also support DayLight Saving Time well. + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT PROCTIME(); +``` +``` ++-------------------------+ +| PROCTIME() | ++-------------------------+ +| 2021-04-15 14:48:31.387 | ++-------------------------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT PROCTIME(); +``` +``` ++-------------------------+ +| PROCTIME() | ++-------------------------+ +| 2021-04-15 22:48:31.387 | ++-------------------------+ +``` + +```sql +Flink SQL> CREATE TABLE MyTable1 ( + item STRING, + price DOUBLE, + proctime as PROCTIME() + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView3 AS + SELECT + TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime, + item, + MAX(price) as max_price + FROM MyTable1 + GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView3; +``` + +``` ++-----------------+-----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++-----------------+-----------------------------+-------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | false | | | | +| window_end | TIMESTAMP(3) | false | | | | +| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++-----------------+-----------------------------+-------+-----+--------+-----------+ +``` + +The input data of MyTable1 is: +``` +A,1.1 +B,1.2 +A,1.8 +B,2.5 +C,3.8 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView3; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_procime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView3; +``` + +Returns the different window start, window end and window proctime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_procime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +### Event Time Attribute and Time Zone +Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. + +#### Event Time Attribute on TIMESTAMP +If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the event time attribute as a `TIMESTAMP` column: +```sql +Flink SQL> CREATE TABLE MyTable2 ( + item STRING, + price DOUBLE, + ts TIMESTAMP(3), -- TIMESTAMP data type + WATERMARK FOR ts AS ts - INTERVAL '10' SECOND + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView4 AS + SELECT + TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime, + item, + MAX(price) as max_price + FROM MyTable2 + GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView4; +``` + +``` ++----------------+------------------------+------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+------------------------+------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | true | | | | +| window_end | TIMESTAMP(3) | true | | | | +| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++----------------+------------------------+------+-----+--------+-----------+ +``` + +The input data of MyTable2 is: +``` +A,1.1,2021-04-15 14:01:00 +B,1.2,2021-04-15 14:02:00 +A,1.8,2021-04-15 14:03:00 +B,2.5,2021-04-15 14:04:00 +C,3.8,2021-04-15 14:05:00 +C,3.8,2021-04-15 14:11:00 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView4; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView4; +``` + +Returns the same window start, window end and window rowtime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +#### Event Time Attribute on TIMESTAMP_LTZ +If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. `1618989564564`, it's recommended to define event time attribute as a `TIMESTAMP_LTZ` column. +```sql +Flink SQL> CREATE TABLE MyTable3 ( + item STRING, + price DOUBLE, + ts BIGINT, -- long time value in epoch milliseconds + ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), + WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND + ) WITH ( + 'connector' = 'socket', + 'hostname' = '127.0.0.1', + 'port' = '9999', + 'format' = 'csv' + ); + +Flink SQL> CREATE VIEW MyView5 AS + SELECT + TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start, + TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end, + TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime, + item, + MAX(price) as max_price + FROM MyTable3 + GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item; + +Flink SQL> DESC MyView5; +``` + +``` ++----------------+----------------------------+-------+-----+--------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+----------------------------+-------+-----+--------+-----------+ +| window_start | TIMESTAMP(3) | false | | | | +| window_end | TIMESTAMP(3) | false | | | | +| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | | +| item | STRING | true | | | | +| max_price | DOUBLE | true | | | | ++----------------+----------------------------+-------+-----+--------+-----------+ +``` + +The input data of MyTable3 is: +``` +A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00 +B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00 +A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00 +B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00 +C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00 +C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00 +``` + +```sql +Flink SQL> SET table.local-time-zone=UTC; +Flink SQL> SELECT * FROM MyView5; +``` + +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 | +| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +```sql +Flink SQL> SET table.local-time-zone=Asia/Shanghai; +Flink SQL> SELECT * FROM MyView5; +``` + +Returns the different window start, window end and window rowtime compared to calculation in UTC timezone. +``` ++-------------------------+-------------------------+-------------------------+------+-----------+ +| window_start | window_end | window_rowtime | item | max_price | ++-------------------------+-------------------------+-------------------------+------+-----------+ +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 | +| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 | ++-------------------------+-------------------------+-------------------------+------+-----------+ +``` + +## Daylight Saving Time Support +Flink SQL supports defining time attributes on TIMESTAMP_LTZ column, base on this, Flink SQL gracefully uses TIMESTAMP and TIMESTAMP_LTZ type in window processing to support the Daylight Saving Time. + + +Flink use timestamp literal to split the window and assigns window to data according to the epoch time of the each row. It means Flink uses `TIMESTAMP` type for window start and window end (e.g. `TUMBLE_START` and `TUMBLE_END`), uses `TIMESTAMP_LTZ` for window time attribute (e.g. `TUMBLE_PROCTIME`, `TUMBLE_ROWTIME`). +Given a example of tumble window, the DaylightTime in Los_Angele starts at time 2021-03-14 02:00:00: +``` +long epoch1 = 1615708800000L; // 2021-03-14 00:00:00 +long epoch2 = 1615712400000L; // 2021-03-14 01:00:00 +long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour (2021-03-14 02:00:00) +long epoch4 = 1615719600000L; // 2021-03-14 04:00:00 +``` +The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] will collect 3 hours' data in Los_angele time zone, but it collect 4 hours' data in other non-DST time zones, what user to do is only define time attribute on TIMESTAMP_LTZ column. + +All windows in Flink like Hop window, Session window, Cumulative window follow this way, and all operations in Flink SQL support TIMESTAMP_LTZ well, thus Flink gracefully supports the Daylight Saving Time zone.   + + +## Difference between Batch and Streaming Mode +The following time functions: +* LOCALTIME +* LOCALTIMESTAMP +* CURRENT_DATE +* CURRENT_TIME +* CURRENT_TIMESTAMP +* NOW() + +Flink evaluates their values according to execution mode. They are evaluated for each record in streaming mode. But in batch mode, they are evaluated once as the query starts and uses the same result for every row. + +The following time functions are evaluated for each record no matter in batch or streaming mode: + +* CURRENT_ROW_TIMESTAMP() +* PROCTIME() + +{{< top >}}