From afb0dc4933921222b049050b0fe2464f6071884e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 22 Jan 2019 17:09:58 +0900 Subject: [PATCH 1/4] SPARK-26379 Fix issue on adding current_timestamp/current_date to streaming query --- .../streaming/MicroBatchExecution.scala | 11 +++++- .../spark/sql/streaming/StreamSuite.scala | 36 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 64270e1f44a2..d9be744952a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -508,12 +508,21 @@ class MicroBatchExecution( cd.dataType, cd.timeZoneId) } + // Pre-resolve new attributes to ensure all attributes are resolved before + // accessing schema of logical plan. Note that it only leverages the information + // of attributes, so we don't need to concern about the value of literals. + + val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { + case ct: CurrentBatchTimestamp => ct.toLiteral + case cd: CurrentBatchTimestamp => cd.toLiteral + } + val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamingWriteSupportProvider => val writer = s.createStreamingWriteSupport( s"$runId", - newAttributePlan.schema, + newAttrPlanPreResolvedForSchema.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 72321c418f9b..7f979c1ac561 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1079,6 +1079,42 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } } + + Seq(true, false).foreach { useV2Sink => + import org.apache.spark.sql.functions._ + + val newTestName = "SPARK-26379 Structured Streaming - Exception on adding column to Dataset" + + s" - use v2 sink - $useV2Sink" + + test(newTestName) { + val input = MemoryStream[Int] + val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) + + def assertBatchOutputAndUpdateLastTimestamp( + rows: Seq[Row], + curTimestamp: Long, + expectedValue: Int): Long = { + assert(rows.size === 1) + val row = rows.head + assert(row.getInt(0) === expectedValue) + assert(row.getTimestamp(1).getTime > curTimestamp) + row.getTimestamp(1).getTime + } + + var lastTimestamp = -1L + testStream(df, useV2Sink = useV2Sink) ( + AddData(input, 1), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 1) + }, + Execute { _ => Thread.sleep(3 * 1000) }, + AddData(input, 2), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 2) + } + ) + } + } } abstract class FakeSource extends StreamSourceProvider { From 8f30a758915895af79f9a1bca25b55b6cfba2cd6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 22 Jan 2019 17:31:36 +0900 Subject: [PATCH 2/4] Fix build issue which local maven wasn't caught as error strangely --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index d9be744952a9..fde851a5c07d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -513,8 +513,7 @@ class MicroBatchExecution( // of attributes, so we don't need to concern about the value of literals. val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { - case ct: CurrentBatchTimestamp => ct.toLiteral - case cd: CurrentBatchTimestamp => cd.toLiteral + case cbt: CurrentBatchTimestamp => cbt.toLiteral } val triggerLogicalPlan = sink match { From d09845bf52964656dbf46728ee9199a97a4d3465 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 22 Jan 2019 19:37:47 +0900 Subject: [PATCH 3/4] Address review comments from @gaborgsomogyi --- .../spark/sql/streaming/StreamSuite.scala | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 7f979c1ac561..c95f467a56c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream @@ -1080,40 +1081,49 @@ class StreamSuite extends StreamTest { } } - Seq(true, false).foreach { useV2Sink => - import org.apache.spark.sql.functions._ - - val newTestName = "SPARK-26379 Structured Streaming - Exception on adding column to Dataset" + - s" - use v2 sink - $useV2Sink" + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v2 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = true) + } - test(newTestName) { - val input = MemoryStream[Int] - val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) - - def assertBatchOutputAndUpdateLastTimestamp( - rows: Seq[Row], - curTimestamp: Long, - expectedValue: Int): Long = { - assert(rows.size === 1) - val row = rows.head - assert(row.getInt(0) === expectedValue) - assert(row.getTimestamp(1).getTime > curTimestamp) - row.getTimestamp(1).getTime - } + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v1 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = false) + } - var lastTimestamp = -1L - testStream(df, useV2Sink = useV2Sink) ( - AddData(input, 1), - CheckLastBatch { rows: Seq[Row] => - lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 1) - }, - Execute { _ => Thread.sleep(3 * 1000) }, - AddData(input, 2), - CheckLastBatch { rows: Seq[Row] => - lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 2) - } - ) + private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { + val input = MemoryStream[Int] + val df = input.toDS() + .withColumn("cur_timestamp", lit(current_timestamp())) + .withColumn("cur_date", lit(current_date())) + + def assertBatchOutputAndUpdateLastTimestamp( + rows: Seq[Row], + curTimestamp: Long, + curDate: Int, + expectedValue: Int): Long = { + assert(rows.size === 1) + val row = rows.head + assert(row.getInt(0) === expectedValue) + assert(row.getTimestamp(1).getTime >= curTimestamp) + val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) + assert(days == curDate || days == curDate + 1) + row.getTimestamp(1).getTime } + + var lastTimestamp = System.currentTimeMillis() + val currentDate = DateTimeUtils.millisToDays(lastTimestamp) + testStream(df, useV2Sink = useV2Sink) ( + AddData(input, 1), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) + }, + Execute { _ => Thread.sleep(3 * 1000) }, + AddData(input, 2), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2) + } + ) } } From 687c3e40063eeda156fe666544722d08a385f9e3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 22 Jan 2019 19:57:35 +0900 Subject: [PATCH 4/4] Reduce test time --- .../test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c95f467a56c6..0bbef47fd32e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1118,7 +1118,7 @@ class StreamSuite extends StreamTest { CheckLastBatch { rows: Seq[Row] => lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) }, - Execute { _ => Thread.sleep(3 * 1000) }, + Execute { _ => Thread.sleep(1000) }, AddData(input, 2), CheckLastBatch { rows: Seq[Row] => lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2)