Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,13 @@ class InMemoryTable(
this
}

override def buildForBatch(): BatchWrite = writer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this change is optional (shouldn't break anything even this is not change), as we're trying to guarantee backward compatibility, right? I think it's better to change this - just wanted to check about compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a test module, it's okay to change.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could leave it as is. I changed because it would previously fail if a data source implemented the build method only.

In the next PR, I am going to implement RequiredDistributionAndOrdering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion. I didn't mean we should leave it as it is. I just meant to confirm whether the test won't fail even without the fix, to make sure we guarantee compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I confirm it is compatible, @HeartSaVioR.

override def build(): Write = new Write {
override def toBatch: BatchWrite = writer

override def buildForStreaming(): StreamingWrite = streamingWriter match {
case exc: StreamingNotSupportedOperation => exc.throwsException()
case s => s
override def toStreaming: StreamingWrite = streamingWriter match {
case exc: StreamingNotSupportedOperation => exc.throwsException()
case s => s
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,21 +627,22 @@ abstract class StreamExecution(
inputPlan.schema,
new CaseInsensitiveStringMap(options.asJava))
val writeBuilder = table.newWriteBuilder(info)
outputMode match {
val write = outputMode match {
case Append =>
writeBuilder.buildForStreaming()
writeBuilder.build()

case Complete =>
// TODO: we should do this check earlier when we have capability API.
require(writeBuilder.isInstanceOf[SupportsTruncate],
table.name + " does not support Complete mode.")
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
writeBuilder.asInstanceOf[SupportsTruncate].truncate().build()

case Update =>
require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend],
table.name + " does not support Update mode.")
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming()
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build()
}
write.toStreaming
}

protected def purge(threshold: Long): Unit = {
Expand Down