Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Aug 22, 2018

What changes were proposed in this pull request?

This updates the v2 write path to a similar structure as the v2 read path. Individual writes are configured and tracked using WriteConfig (analogous to ScanConfig) and this config is passed to the methods of WriteSupport that are specific to a single write, like commit and abort.

This new config will be used to communicate overwrite options to data sources that implement new support classes, BatchOverwriteSupport and BatchPartitionOverwriteSupport. The new config could also be used by implementations to get and hold locks to make operations atomic.

Streaming is also updated to use a StreamingWriteConfig. Options that are specific to a write, like schema, output mode, and write options.

How was this patch tested?

This is primarily an API change and should pass existing tests.

@rdblue rdblue changed the title SPARK-25188: Add WriteConfig to v2 write API. [SPARK-25188][SQL] Add WriteConfig to v2 write API. Aug 22, 2018
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)

KafkaWriter.validateQuery(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query validation happens in KafkaStreamingWriteSupport. It was duplicated here and in that class. Now, it happens just once when creating the scan config.

* <code>$"day" === '2018-08-22'</code>, to remove that data and commit the replacement data at
* the same time.
*/
public interface BatchOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create the WriteConfig for idempotent overwrite operations. This would be triggered by an overwrite like this (the API could be different).

df.writeTo("table").overwrite($"day" === "2018-08-22")

That would produce a OverwriteData(source, deleteFilter, query) logical plan, which would result in the exec node calling this to create the write config.

* <p>
* This is used to implement INSERT OVERWRITE ... PARTITIONS.
*/
public interface BatchPartitionOverwriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create a WriteConfig that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite.

@SparkQA

This comment has been minimized.

* <p>
* This is used to implement INSERT OVERWRITE ... PARTITIONS.
*/
public interface BatchPartitionOverwriteSupport extends BatchWriteSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will be used to create a WriteConfig that instructs the data source to replace partitions in the existing data with partitions of a dataframe. The logical plan would be DynamicPartitionOverwrite.

* streaming write support.
*/
class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
class MicroBatchWriteSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixed a typo in the class name.

@SparkQA

This comment has been minimized.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 22, 2018

This is related to #21308, which adds DeleteSupport. Both BatchOverwriteSupport and DeleteSupport use the same input to remove data (Filter[]) and can reject deletes that don't align with partition boundaries.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

Retest this please.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@rdblue rdblue force-pushed the SPARK-25188-add-write-config branch from 37d5087 to 847300f Compare August 23, 2018 17:49
@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95174 has finished for PR 22190 at commit 847300f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

@rxin, @cloud-fan, @jose-torres: this is the update to add WriteConfig. There's one failed test that I think is unrelated, so this is ready for you to have a look. This will probably need to be updated for the current changes under discussion.

@rdblue
Copy link
Contributor Author

rdblue commented Aug 23, 2018

Retest this please

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95188 has finished for PR 22190 at commit 847300f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants