From cec965c87f4f4289dd2b61fa00832eaaef5c5cda Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 25 Jan 2019 14:58:03 -0800 Subject: [PATCH 1/2] [SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes #23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../streaming/MicroBatchExecution.scala | 10 +++- .../spark/sql/streaming/StreamSuite.scala | 46 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6a264ad708da..6fed139440b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -437,12 +437,20 @@ class MicroBatchExecution( cd.dataType, cd.timeZoneId) } + // Pre-resolve new attributes to ensure all attributes are resolved before + // accessing schema of logical plan. Note that it only leverages the information + // of attributes, so we don't need to concern about the value of literals. + + val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { + case cbt: CurrentBatchTimestamp => cbt.toLiteral + } + val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamWriteSupport => val writer = s.createStreamWriter( s"$runId", - newAttributePlan.schema, + newAttrPlanPreResolvedForSchema.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) if (writer.isInstanceOf[SupportsWriteInternalRow]) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65e5d3dd75c..a2d4477a97eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} @@ -825,6 +826,51 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v2 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = true) + } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v1 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = false) + } + + private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { + val input = MemoryStream[Int] + val df = input.toDS() + .withColumn("cur_timestamp", lit(current_timestamp())) + .withColumn("cur_date", lit(current_date())) + + def assertBatchOutputAndUpdateLastTimestamp( + rows: Seq[Row], + curTimestamp: Long, + curDate: Int, + expectedValue: Int): Long = { + assert(rows.size === 1) + val row = rows.head + assert(row.getInt(0) === expectedValue) + assert(row.getTimestamp(1).getTime >= curTimestamp) + val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) + assert(days == curDate || days == curDate + 1) + row.getTimestamp(1).getTime + } + + var lastTimestamp = System.currentTimeMillis() + val currentDate = DateTimeUtils.millisToDays(lastTimestamp) + testStream(df, useV2Sink = useV2Sink) ( + AddData(input, 1), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) + }, + Execute { _ => Thread.sleep(1000) }, + AddData(input, 2), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2) + } + ) + } } abstract class FakeSource extends StreamSourceProvider { From 52ac627589db4ad63bf959406ad70b9d8790f40a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 27 Jan 2019 10:04:51 -0800 Subject: [PATCH 2/2] [SPARK-26379][SS][FOLLOWUP] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp ## What changes were proposed in this pull request? Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`. However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't. Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`. Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`. This PR reverts the [previous patch](https://github.com/apache/spark/pull/23609) on `MicroBatchExecution` and fixes the root cause. ## How was this patch tested? Pass the Jenkins with the updated test cases. Closes #23660 from dongjoon-hyun/SPARK-26379. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../execution/streaming/MicroBatchExecution.scala | 15 +++++---------- .../apache/spark/sql/streaming/StreamSuite.scala | 10 +++------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6fed139440b3..dbbb7e7c23c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -430,27 +430,22 @@ class MicroBatchExecution( // Rewire the plan to use the new attributes that were returned by the source. val newAttributePlan = newBatchesPlan transformAllExpressions { case ct: CurrentTimestamp => + // CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is. + // Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit + // dummy string to prevent UnresolvedException and to prevent to be used in the future. CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, - ct.dataType) + ct.dataType, Some("Dummy TimeZoneId")) case cd: CurrentDate => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType, cd.timeZoneId) } - // Pre-resolve new attributes to ensure all attributes are resolved before - // accessing schema of logical plan. Note that it only leverages the information - // of attributes, so we don't need to concern about the value of literals. - - val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { - case cbt: CurrentBatchTimestamp => cbt.toLiteral - } - val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamWriteSupport => val writer = s.createStreamWriter( s"$runId", - newAttrPlanPreResolvedForSchema.schema, + newAttributePlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) if (writer.isInstanceOf[SupportsWriteInternalRow]) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index a2d4477a97eb..92fdde833bbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -827,21 +827,19 @@ class StreamSuite extends StreamTest { } } - test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + " to Dataset - use v2 sink") { testCurrentTimestampOnStreamingQuery(useV2Sink = true) } - test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + " to Dataset - use v1 sink") { testCurrentTimestampOnStreamingQuery(useV2Sink = false) } private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { val input = MemoryStream[Int] - val df = input.toDS() - .withColumn("cur_timestamp", lit(current_timestamp())) - .withColumn("cur_date", lit(current_date())) + val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) def assertBatchOutputAndUpdateLastTimestamp( rows: Seq[Row], @@ -852,8 +850,6 @@ class StreamSuite extends StreamTest { val row = rows.head assert(row.getInt(0) === expectedValue) assert(row.getTimestamp(1).getTime >= curTimestamp) - val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) - assert(days == curDate || days == curDate + 1) row.getTimestamp(1).getTime }