Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,20 @@ class MicroBatchExecution(
cd.dataType, cd.timeZoneId)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause is CurrentBatchTimestamp is TimeZoneAwareExpression which is unresolved without TimeZoneId.


// Pre-resolve new attributes to ensure all attributes are resolved before
// accessing schema of logical plan. Note that it only leverages the information
// of attributes, so we don't need to concern about the value of literals.

val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even leverage newBatchesPlan and remove this: this effort is to ensure going through the same path with further transformation when extracting schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR . I prefer to leverage the existing lines (501 ~ 509) as you said like the following. I assume that the following is what you mean. Eventually, it's two line changes (excluding comments). WDYT?

    // Rewire the plan to use the new attributes that were returned by the source.
    val newAttributePlan = newBatchesPlan transformAllExpressions {
      case ct: CurrentTimestamp =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          ct.dataType)
+          ct.dataType).toLiteral
      case cd: CurrentDate =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType, cd.timeZoneId)
+          cd.dataType, cd.timeZoneId).toLiteral
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
Actually I meant we pass newBatchesPlan instead of newAttributePlan only for extracting schema since we actually guess the name and data type will not be changed from IncrementalExecution. But IMHO I feel safer pre-transforming it.

Transforming CurrentBatchTimestamp to Literal not in IncrementalExecution breaks the intention what CurrentBatchTimestamp javadoc describes. Please note that I used transformed plan only for extracting schema and the actual plan is not changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

case cbt: CurrentBatchTimestamp => cbt.toLiteral
}

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
s"$runId",
newAttributePlan.schema,
newAttrPlanPreResolvedForSchema.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
Expand Down Expand Up @@ -1079,6 +1080,51 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
" to Dataset - use v2 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = true)
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
" to Dataset - use v1 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = false)
}

private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
val input = MemoryStream[Int]
val df = input.toDS()
.withColumn("cur_timestamp", lit(current_timestamp()))
.withColumn("cur_date", lit(current_date()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this hides current_timestamp and makes this test case succeeds always.


def assertBatchOutputAndUpdateLastTimestamp(
rows: Seq[Row],
curTimestamp: Long,
curDate: Int,
expectedValue: Int): Long = {
assert(rows.size === 1)
val row = rows.head
assert(row.getInt(0) === expectedValue)
assert(row.getTimestamp(1).getTime >= curTimestamp)
val days = DateTimeUtils.millisToDays(row.getDate(2).getTime)
assert(days == curDate || days == curDate + 1)
row.getTimestamp(1).getTime
}

var lastTimestamp = System.currentTimeMillis()
val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
testStream(df, useV2Sink = useV2Sink) (
AddData(input, 1),
CheckLastBatch { rows: Seq[Row] =>
lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1)
},
Execute { _ => Thread.sleep(1000) },
AddData(input, 2),
CheckLastBatch { rows: Seq[Row] =>
lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2)
}
)
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down