Skip to content

Commit c72388e

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-34049][SS] DataSource V2: Use Write abstraction in StreamExecution
### What changes were proposed in this pull request? This PR makes `StreamExecution` use the `Write` abstraction introduced in SPARK-33779. Note: we will need separate plans for streaming writes in order to support the required distribution and ordering in SS. This change only migrates to the `Write` abstraction. ### Why are the changes needed? These changes prevent exceptions from data sources that implement only the `build` method in `WriteBuilder`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes apache#31093 from aokolnychyi/spark-34049. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d1d4f6d commit c72388e

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,13 @@ class InMemoryTable(
262262
this
263263
}
264264

265-
override def buildForBatch(): BatchWrite = writer
265+
override def build(): Write = new Write {
266+
override def toBatch: BatchWrite = writer
266267

267-
override def buildForStreaming(): StreamingWrite = streamingWriter match {
268-
case exc: StreamingNotSupportedOperation => exc.throwsException()
269-
case s => s
268+
override def toStreaming: StreamingWrite = streamingWriter match {
269+
case exc: StreamingNotSupportedOperation => exc.throwsException()
270+
case s => s
271+
}
270272
}
271273
}
272274
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,21 +618,22 @@ abstract class StreamExecution(
618618
inputPlan.schema,
619619
new CaseInsensitiveStringMap(options.asJava))
620620
val writeBuilder = table.newWriteBuilder(info)
621-
outputMode match {
621+
val write = outputMode match {
622622
case Append =>
623-
writeBuilder.buildForStreaming()
623+
writeBuilder.build()
624624

625625
case Complete =>
626626
// TODO: we should do this check earlier when we have capability API.
627627
require(writeBuilder.isInstanceOf[SupportsTruncate],
628628
table.name + " does not support Complete mode.")
629-
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
629+
writeBuilder.asInstanceOf[SupportsTruncate].truncate().build()
630630

631631
case Update =>
632632
require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend],
633633
table.name + " does not support Update mode.")
634-
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming()
634+
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build()
635635
}
636+
write.toStreaming
636637
}
637638

638639
protected def purge(threshold: Long): Unit = {

0 commit comments

Comments
 (0)