-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite. #23606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8634911
aca75ae
8993f19
cc5fdac
d692754
84d01ab
c47575e
d67ad46
0e42cc2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -365,16 +365,17 @@ case class Join( | |
| } | ||
|
|
||
| /** | ||
| * Append data to an existing table. | ||
| * Base trait for DataSourceV2 write commands | ||
| */ | ||
| case class AppendData( | ||
| table: NamedRelation, | ||
| query: LogicalPlan, | ||
| isByName: Boolean) extends LogicalPlan { | ||
| trait V2WriteCommand extends Command { | ||
| def table: NamedRelation | ||
| def query: LogicalPlan | ||
|
|
||
| override def children: Seq[LogicalPlan] = Seq(query) | ||
| override def output: Seq[Attribute] = Seq.empty | ||
|
|
||
| override lazy val resolved: Boolean = { | ||
| override lazy val resolved: Boolean = outputResolved | ||
|
|
||
| def outputResolved: Boolean = { | ||
| table.resolved && query.resolved && query.output.size == table.output.size && | ||
| query.output.zip(table.output).forall { | ||
| case (inAttr, outAttr) => | ||
|
|
@@ -386,16 +387,66 @@ case class AppendData( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Append data to an existing table. | ||
| */ | ||
| case class AppendData( | ||
| table: NamedRelation, | ||
| query: LogicalPlan, | ||
| isByName: Boolean) extends V2WriteCommand | ||
|
|
||
| object AppendData { | ||
| def byName(table: NamedRelation, df: LogicalPlan): AppendData = { | ||
| new AppendData(table, df, true) | ||
| new AppendData(table, df, isByName = true) | ||
| } | ||
|
|
||
| def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = { | ||
| new AppendData(table, query, false) | ||
| new AppendData(table, query, isByName = false) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Overwrite data matching a filter in an existing table. | ||
| */ | ||
| case class OverwriteByExpression( | ||
| table: NamedRelation, | ||
| deleteExpr: Expression, | ||
| query: LogicalPlan, | ||
| isByName: Boolean) extends V2WriteCommand { | ||
| override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved | ||
| } | ||
|
|
||
| object OverwriteByExpression { | ||
| def byName( | ||
| table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { | ||
| OverwriteByExpression(table, deleteExpr, df, isByName = true) | ||
| } | ||
|
|
||
| def byPosition( | ||
| table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { | ||
| OverwriteByExpression(table, deleteExpr, query, isByName = false) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Dynamically overwrite partitions in an existing table. | ||
| */ | ||
| case class OverwritePartitionsDynamic( | ||
|
||
| table: NamedRelation, | ||
| query: LogicalPlan, | ||
| isByName: Boolean) extends V2WriteCommand | ||
|
|
||
| object OverwritePartitionsDynamic { | ||
| def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = { | ||
|
||
| OverwritePartitionsDynamic(table, df, isByName = true) | ||
| } | ||
|
|
||
| def byPosition(table: NamedRelation, query: LogicalPlan): OverwritePartitionsDynamic = { | ||
| OverwritePartitionsDynamic(table, query, isByName = false) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the | ||
| * concrete implementations during analysis. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1452,7 +1452,7 @@ object SQLConf { | |
| " register class names for which data source V2 write paths are disabled. Writes from these" + | ||
| " sources will fall back to the V1 sources.") | ||
| .stringConf | ||
| .createWithDefault("") | ||
| .createWithDefault("orc") | ||
|
||
|
|
||
| val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") | ||
| .doc("A comma-separated list of fully qualified data source register class names for which" + | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason it's
dfhere andqueryinbyPosition?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The DataFrame API uses
byName, while the SQL path usesbyPosition.