Skip to content

Commit d3978ed

Browse files
committed
[FLINK-22378][table] Support TIMESTAMP as StreamRecord timestamp in toDataStream
1 parent fb0c054 commit d3978ed

File tree

7 files changed

+165
-47
lines changed

7 files changed

+165
-47
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
23-
import org.apache.flink.table.api.TableSchema;
23+
import org.apache.flink.table.catalog.ResolvedSchema;
2424
import org.apache.flink.table.connector.ChangelogMode;
2525
import org.apache.flink.table.connector.ParallelismProvider;
2626
import org.apache.flink.table.connector.RuntimeConverter;
@@ -96,8 +96,7 @@ public interface DynamicTableSink {
9696
* interfaces might be located in other Flink modules.
9797
*
9898
* <p>Independent of the provider interface, the table runtime expects that a sink
99-
* implementation accepts internal data structures (see {@link
100-
* org.apache.flink.table.data.RowData} for more information).
99+
* implementation accepts internal data structures (see {@link RowData} for more information).
101100
*
102101
* <p>The given {@link Context} offers utilities by the planner for creating runtime
103102
* implementation with minimal dependencies to internal data structures.
@@ -146,7 +145,7 @@ interface Context {
146145
* Creates type information describing the internal data structures of the given {@link
147146
* DataType}.
148147
*
149-
* @see TableSchema#toPhysicalRowDataType()
148+
* @see ResolvedSchema#toPhysicalRowDataType()
150149
*/
151150
<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);
152151

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/TimestampData.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@
2323
import org.apache.flink.table.types.logical.TimestampType;
2424
import org.apache.flink.util.Preconditions;
2525

26+
import javax.annotation.Nullable;
27+
2628
import java.sql.Timestamp;
2729
import java.time.Instant;
2830
import java.time.LocalDate;
2931
import java.time.LocalDateTime;
3032
import java.time.LocalTime;
33+
import java.time.ZoneId;
34+
import java.time.ZonedDateTime;
3135

3236
/**
3337
* An internal data structure representing data of {@link TimestampType} and {@link
@@ -88,7 +92,7 @@ public LocalDateTime toLocalDateTime() {
8892
return LocalDateTime.of(localDate, localTime);
8993
}
9094

91-
/** Converts this {@link TimestampData} object to a {@link Instant}. */
95+
/** Converts this {@link TimestampData} object to an {@link Instant}. */
9296
public Instant toInstant() {
9397
long epochSecond = millisecond / 1000;
9498
int milliOfSecond = (int) (millisecond % 1000);
@@ -100,6 +104,29 @@ public Instant toInstant() {
100104
return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
101105
}
102106

107+
/**
108+
* Converts this {@link TimestampData} object to milliseconds adjusted by the given time zone to
109+
* epoch millis.
110+
*
111+
* @param zoneId zone for offset calculation or null if {@link TimestampData} is already in UTC
112+
* (i.e. for {@link LocalZonedTimestampType})
113+
*/
114+
public long toZonedMillisecond(@Nullable ZoneId zoneId) {
115+
if (zoneId == null) {
116+
return millisecond;
117+
}
118+
int date = (int) (millisecond / MILLIS_PER_DAY);
119+
int time = (int) (millisecond % MILLIS_PER_DAY);
120+
if (time < 0) {
121+
--date;
122+
time += MILLIS_PER_DAY;
123+
}
124+
long nanoOfDay = time * 1_000_000L + nanoOfMillisecond;
125+
LocalDate localDate = LocalDate.ofEpochDay(date);
126+
LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
127+
return ZonedDateTime.of(localDate, localTime, zoneId).toInstant().toEpochMilli();
128+
}
129+
103130
@Override
104131
public int compareTo(TimestampData that) {
105132
int cmp = Long.compare(this.millisecond, that.millisecond);

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161

6262
import javax.annotation.Nullable;
6363

64+
import java.time.ZoneId;
6465
import java.util.ArrayList;
6566
import java.util.Collections;
6667
import java.util.List;
@@ -133,13 +134,15 @@ public static RelNode convertExternalToRel(
133134
FlinkRelBuilder relBuilder,
134135
RelNode input,
135136
ExternalModifyOperation externalModifyOperation) {
137+
final ZoneId localTimeZone = unwrapContext(relBuilder).getTableConfig().getLocalTimeZone();
136138
final ResolvedSchema schema = externalModifyOperation.getResolvedSchema();
137139
final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
138140
final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema);
139141
final DynamicTableSink tableSink =
140142
new ExternalDynamicSink(
141143
externalModifyOperation.getChangelogMode().orElse(null),
142-
externalModifyOperation.getPhysicalDataType());
144+
externalModifyOperation.getPhysicalDataType(),
145+
localTimeZone);
143146
return convertSinkToRel(
144147
relBuilder,
145148
input,

flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@
3131
import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
3232
import org.apache.flink.table.types.DataType;
3333
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3435
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
3536

3637
import javax.annotation.Nullable;
3738

39+
import java.time.ZoneId;
3840
import java.util.Collections;
3941
import java.util.List;
4042
import java.util.Map;
4143

44+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
45+
4246
/** Table sink for connecting to the external {@link DataStream} API. */
4347
@Internal
4448
final class ExternalDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
@@ -51,13 +55,17 @@ final class ExternalDynamicSink implements DynamicTableSink, SupportsWritingMeta
5155

5256
private final DataType physicalDataType;
5357

58+
private final ZoneId rowtimeShift;
59+
5460
// mutable attributes
5561

5662
private boolean consumeRowtimeMetadata;
5763

58-
ExternalDynamicSink(@Nullable ChangelogMode changelogMode, DataType physicalDataType) {
64+
ExternalDynamicSink(
65+
@Nullable ChangelogMode changelogMode, DataType physicalDataType, ZoneId rowtimeShift) {
5966
this.changelogMode = changelogMode;
6067
this.physicalDataType = physicalDataType;
68+
this.rowtimeShift = rowtimeShift;
6169
}
6270

6371
@Override
@@ -86,19 +94,33 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
8694
atomicFieldGetter = RowData.createFieldGetter(physicalType, 0);
8795
}
8896

97+
final int rowtimeIndex = transformationContext.getRowtimeIndex();
98+
8999
return new OneInputTransformation<>(
90100
input,
91101
generateOperatorName(),
92102
new OutputConversionOperator(
93103
atomicFieldGetter,
94104
physicalConverter,
95-
transformationContext.getRowtimeIndex(),
105+
rowtimeIndex,
106+
generateRowtimeShift(rowtimeIndex),
96107
consumeRowtimeMetadata),
97108
ExternalTypeInfo.of(physicalDataType),
98109
input.getParallelism());
99110
};
100111
}
101112

113+
private @Nullable ZoneId generateRowtimeShift(int rowtimeIndex) {
114+
if (rowtimeIndex != -1) {
115+
final LogicalType rowtimeType =
116+
physicalDataType.getChildren().get(rowtimeIndex).getLogicalType();
117+
if (hasRoot(rowtimeType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
118+
return rowtimeShift;
119+
}
120+
}
121+
return null;
122+
}
123+
102124
private String generateOperatorName() {
103125
return String.format(
104126
"TableToDataSteam(type=%s, rowtime=%s)",
@@ -107,7 +129,7 @@ private String generateOperatorName() {
107129

108130
@Override
109131
public DynamicTableSink copy() {
110-
return new ExternalDynamicSink(changelogMode, physicalDataType);
132+
return new ExternalDynamicSink(changelogMode, physicalDataType, rowtimeShift);
111133
}
112134

113135
@Override

flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31+
import org.apache.flink.streaming.api.functions.ProcessFunction;
3132
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
3233
import org.apache.flink.streaming.api.windowing.time.Time;
3334
import org.apache.flink.table.api.DataTypes;
@@ -48,6 +49,7 @@
4849
import org.apache.flink.types.RowKind;
4950
import org.apache.flink.util.CloseableIterator;
5051
import org.apache.flink.util.CollectionUtil;
52+
import org.apache.flink.util.Collector;
5153

5254
import org.junit.Before;
5355
import org.junit.Test;
@@ -57,6 +59,8 @@
5759
import org.junit.runners.Parameterized.Parameters;
5860

5961
import java.time.DayOfWeek;
62+
import java.time.LocalDateTime;
63+
import java.time.ZoneId;
6064
import java.time.ZoneOffset;
6165
import java.util.Arrays;
6266
import java.util.Collections;
@@ -433,6 +437,51 @@ public void testFromAndToChangelogStreamUpsert() throws Exception {
433437
getOutput(inputOrOutput));
434438
}
435439

440+
@Test
441+
public void testToDataStreamCustomEventTime() throws Exception {
442+
final ZoneId originalZone = tableEnv.getConfig().getLocalTimeZone();
443+
444+
final ZoneId customZone = ZoneId.of("Europe/Berlin");
445+
446+
tableEnv.getConfig().setLocalTimeZone(customZone);
447+
448+
final LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000");
449+
final LocalDateTime localDateTime2 = LocalDateTime.parse("1970-01-01T01:00:00.000");
450+
451+
final DataStream<Tuple2<LocalDateTime, String>> dataStream =
452+
env.fromElements(
453+
new Tuple2<>(localDateTime1, "alice"), new Tuple2<>(localDateTime2, "bob"));
454+
455+
final Table table =
456+
tableEnv.fromDataStream(
457+
dataStream,
458+
Schema.newBuilder()
459+
.column("f0", DataTypes.TIMESTAMP(3))
460+
.column("f1", DataTypes.STRING())
461+
.watermark("f0", "f0 - INTERVAL '0.01' SECOND")
462+
.build());
463+
464+
final DataStream<Long> rowtimeStream =
465+
tableEnv.toDataStream(table)
466+
.process(
467+
new ProcessFunction<Row, Long>() {
468+
@Override
469+
public void processElement(
470+
Row value, Context ctx, Collector<Long> out) {
471+
out.collect(ctx.timestamp());
472+
}
473+
});
474+
475+
// TODO timestamp and watermark are off by the time zone at the moment see FLINK-22356
476+
477+
testResult(
478+
rowtimeStream,
479+
localDateTime1.atZone(customZone).toInstant().toEpochMilli(),
480+
localDateTime2.atZone(customZone).toInstant().toEpochMilli());
481+
482+
tableEnv.getConfig().setLocalTimeZone(originalZone);
483+
}
484+
436485
// --------------------------------------------------------------------------------------------
437486
// Helper methods
438487
// --------------------------------------------------------------------------------------------

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import javax.annotation.Nullable;
3131

32+
import java.time.ZoneId;
33+
3234
/** Operator that converts to external data structures and unwraps atomic records if necessary. */
3335
@Internal
3436
public class OutputConversionOperator extends TableStreamOperator<Object>
@@ -40,6 +42,8 @@ public class OutputConversionOperator extends TableStreamOperator<Object>
4042

4143
private final int rowtimeIndex;
4244

45+
private final @Nullable ZoneId rowtimeShift;
46+
4347
private final boolean consumeRowtimeMetadata;
4448

4549
private transient StreamRecord<Object> outRecord;
@@ -48,10 +52,12 @@ public OutputConversionOperator(
4852
@Nullable RowData.FieldGetter atomicFieldGetter,
4953
DataStructureConverter converter,
5054
int rowtimeIndex,
55+
@Nullable ZoneId rowtimeShift,
5156
boolean consumeRowtimeMetadata) {
5257
this.atomicFieldGetter = atomicFieldGetter;
5358
this.converter = converter;
5459
this.rowtimeIndex = rowtimeIndex;
60+
this.rowtimeShift = rowtimeShift;
5561
this.consumeRowtimeMetadata = consumeRowtimeMetadata;
5662
}
5763

@@ -70,10 +76,13 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
7076
final RowData rowData = element.getValue();
7177

7278
if (consumeRowtimeMetadata) {
79+
// timestamp is TIMESTAMP_LTZ
7380
final long rowtime = rowData.getTimestamp(rowData.getArity() - 1, 3).getMillisecond();
7481
outRecord.setTimestamp(rowtime);
7582
} else if (rowtimeIndex != -1) {
76-
final long rowtime = rowData.getTimestamp(rowtimeIndex, 3).getMillisecond();
83+
// timestamp might be TIMESTAMP or TIMESTAMP_LTZ
84+
final long rowtime =
85+
rowData.getTimestamp(rowtimeIndex, 3).toZonedMillisecond(rowtimeShift);
7786
outRecord.setTimestamp(rowtime);
7887
}
7988

0 commit comments

Comments
 (0)