Skip to content

Commit d67ad46

Browse files
committed
SPARK-26666: Update for review comments.
1 parent c47575e commit d67ad46

File tree

6 files changed

+48
-14
lines changed

6 files changed

+48
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ class Analyzer(
978978
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
979979
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
980980

981-
case o: OverwriteByExpression if !o.writeResolved =>
981+
case o: OverwriteByExpression if !o.outputResolved =>
982982
// do not resolve expression attributes until the query attributes are resolved against the
983983
// table by ResolveOutputRelation. that rule will alias the attributes to the table's names.
984984
o
@@ -2251,7 +2251,7 @@ class Analyzer(
22512251
object ResolveOutputRelation extends Rule[LogicalPlan] {
22522252
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
22532253
case append @ AppendData(table, query, isByName)
2254-
if table.resolved && query.resolved && !append.writeResolved =>
2254+
if table.resolved && query.resolved && !append.outputResolved =>
22552255
val projection = resolveOutputColumns(table.name, table.output, query, isByName)
22562256

22572257
if (projection != query) {
@@ -2261,7 +2261,7 @@ class Analyzer(
22612261
}
22622262

22632263
case overwrite @ OverwriteByExpression(table, _, query, isByName)
2264-
if table.resolved && query.resolved && !overwrite.writeResolved =>
2264+
if table.resolved && query.resolved && !overwrite.outputResolved =>
22652265
val projection = resolveOutputColumns(table.name, table.output, query, isByName)
22662266

22672267
if (projection != query) {
@@ -2271,7 +2271,7 @@ class Analyzer(
22712271
}
22722272

22732273
case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
2274-
if table.resolved && query.resolved && !overwrite.writeResolved =>
2274+
if table.resolved && query.resolved && !overwrite.outputResolved =>
22752275
val projection = resolveOutputColumns(table.name, table.output, query, isByName)
22762276

22772277
if (projection != query) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,9 @@ trait V2WriteCommand extends Command {
373373

374374
override def children: Seq[LogicalPlan] = Seq(query)
375375

376-
override lazy val resolved: Boolean = writeResolved
376+
override lazy val resolved: Boolean = outputResolved
377377

378-
def writeResolved: Boolean = {
378+
def outputResolved: Boolean = {
379379
table.resolved && query.resolved && query.output.size == table.output.size &&
380380
query.output.zip(table.output).forall {
381381
case (inAttr, outAttr) =>
@@ -413,7 +413,7 @@ case class OverwriteByExpression(
413413
deleteExpr: Expression,
414414
query: LogicalPlan,
415415
isByName: Boolean) extends V2WriteCommand {
416-
override lazy val resolved: Boolean = writeResolved && deleteExpr.resolved
416+
override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved
417417
}
418418

419419
object OverwriteByExpression {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public interface SupportsPushDownFilters extends ScanBuilder {
2929

3030
/**
3131
* Pushes down filters, and returns filters that need to be evaluated after scanning.
32+
* <p>
33+
* Rows should be returned from the data source if and only if all of the filters match. That is,
34+
* filters must be interpreted as ANDed together.
3235
*/
3336
Filter[] pushFilters(Filter[] filters);
3437

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate {
3030
/**
3131
* Configures a write to replace data matching the filters with data committed in the write.
32+
* <p>
33+
* Rows must be deleted from the data source if and only if all of the filters match. That is,
34+
* filters must be interpreted as ANDed together.
3235
*
3336
* @param filters filters used to match data to overwrite
3437
* @return this write builder for method chaining

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
4646
override def output: Seq[Attribute] = Nil
4747
}
4848

49+
/**
50+
* Physical plan node for append into a v2 table.
51+
*
52+
* Rows in the output data set are appended.
53+
*/
4954
case class AppendDataExec(
5055
table: SupportsBatchWrite,
5156
writeOptions: DataSourceOptions,
@@ -63,9 +68,19 @@ case class AppendDataExec(
6368
}
6469
}
6570

71+
/**
72+
* Physical plan node for overwrite into a v2 table.
73+
*
74+
* Overwrites data in a table matched by a set of filters. Rows matching all of the filters will be
75+
* deleted and rows in the output data set are appended.
76+
*
77+
* This plan is used to implement SaveMode.Overwrite. The behavior of SaveMode.Overwrite is to
78+
* truncate the table -- delete all rows -- and append the output data set. This uses the filter
79+
* AlwaysTrue to delete all rows.
80+
*/
6681
case class OverwriteByExpressionExec(
6782
table: SupportsBatchWrite,
68-
filters: Array[Filter],
83+
deleteWhere: Array[Filter],
6984
writeOptions: DataSourceOptions,
7085
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
7186

@@ -75,15 +90,15 @@ case class OverwriteByExpressionExec(
7590

7691
override protected def doExecute(): RDD[InternalRow] = {
7792
val batchWrite = newWriteBuilder() match {
78-
case builder: SupportsTruncate if isTruncate(filters) =>
93+
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
7994
builder.truncate().buildForBatch()
8095

81-
case builder: SupportsOverwrite =>
82-
builder.overwrite(filters).buildForBatch()
83-
84-
case builder: SupportsSaveMode =>
96+
case builder: SupportsSaveMode if isTruncate(deleteWhere) =>
8597
builder.mode(SaveMode.Overwrite).buildForBatch()
8698

99+
case builder: SupportsOverwrite =>
100+
builder.overwrite(deleteWhere).buildForBatch()
101+
87102
case _ =>
88103
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
89104
}
@@ -92,6 +107,15 @@ case class OverwriteByExpressionExec(
92107
}
93108
}
94109

110+
/**
111+
* Physical plan node for dynamic partition overwrite into a v2 table.
112+
*
113+
* Dynamic partition overwrite is the behavior of Hive INSERT OVERWRITE ... PARTITION queries, and
114+
* Spark INSERT OVERWRITE queries when spark.sql.sources.partitionOverwriteMode=dynamic. Each
115+
* partition in the output data set replaces the corresponding existing partition in the table or
116+
* creates a new partition. Existing partitions for which there is no data in the output data set
117+
* are not modified.
118+
*/
95119
case class OverwritePartitionsDynamicExec(
96120
table: SupportsBatchWrite,
97121
writeOptions: DataSourceOptions,

sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import org.apache.spark.annotation.Stable
20+
import org.apache.spark.annotation.{Evolving, Stable}
2121

2222
////////////////////////////////////////////////////////////////////////////////////////////////////
2323
// This file defines all the filters that we can push down to the data sources.
@@ -222,19 +222,23 @@ case class StringContains(attribute: String, value: String) extends Filter {
222222
/**
223223
* A filter that always evaluates to `true`.
224224
*/
225+
@Evolving
225226
case class AlwaysTrue() extends Filter {
226227
override def references: Array[String] = Array.empty
227228
}
228229

230+
@Evolving
229231
object AlwaysTrue extends AlwaysTrue {
230232
}
231233

232234
/**
233235
* A filter that always evaluates to `false`.
234236
*/
237+
@Evolving
235238
case class AlwaysFalse() extends Filter {
236239
override def references: Array[String] = Array.empty
237240
}
238241

242+
@Evolving
239243
object AlwaysFalse extends AlwaysFalse {
240244
}

0 commit comments

Comments
 (0)