Skip to content

Commit 15d2071

Browse files
committed
address comments
1 parent 22ba355 commit 15d2071

File tree

4 files changed

+83
-18
lines changed

4 files changed

+83
-18
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
* have a public, 0-arg constructor.
2828
* <p>
2929
* Note that, TableProvider can only apply data operations to existing tables, like read, append,
30-
* delete, and overwrite. Not operations that require metadata changes, like create/drop tables.
30+
* delete, and overwrite. It does not support the operations that require metadata changes, like
31+
* create/drop tables.
3132
* <p>
3233
* The major responsibility of this interface is to return a {@link Table} for read/write.
3334
* </p>

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
5858
* <li>`SaveMode.ErrorIfExists`: throw an exception at runtime.</li>
5959
* </ul>
6060
* <p>
61-
* When writing to data source v1, the default option is `ErrorIfExist`. When writing to data
61+
* When writing to data source v1, the default option is `ErrorIfExists`. When writing to data
6262
* source v2, the default option is `Append`.
6363
*
6464
* @since 1.4.0
@@ -80,14 +80,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
8080
* @since 1.4.0
8181
*/
8282
def mode(saveMode: String): DataFrameWriter[T] = {
83-
mode(saveMode.toLowerCase(Locale.ROOT) match {
84-
case "overwrite" => SaveMode.Overwrite
85-
case "append" => SaveMode.Append
86-
case "ignore" => SaveMode.Ignore
87-
case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
83+
saveMode.toLowerCase(Locale.ROOT) match {
84+
case "overwrite" => mode(SaveMode.Overwrite)
85+
case "append" => mode(SaveMode.Append)
86+
case "ignore" => mode(SaveMode.Ignore)
87+
case "error" | "errorifexists" => mode(SaveMode.ErrorIfExists)
88+
case "default" => this
8889
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
8990
"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
90-
})
91+
}
9192
}
9293

9394
/**
@@ -269,10 +270,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
269270

270271
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
271272
provider.getTable(dsOptions) match {
272-
// TODO: for backward compatibility reasons, the builtin file source needs to support all
273-
// the save modes, which violates the semantic of `TableProvider`. Here we special-case
274-
// file source and pass the save mode to file source directly. This hack can be removed
275-
// after we figure out a general interface for path-based data sources.
273+
// TODO (SPARK-27815): To not break existing tests, here we treat file source as a special
274+
// case, and pass the save mode to file source directly. This hack should be removed.
276275
case table: FileTable =>
277276
val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
278277
.mode(modeForDSV1) // should not change default mode for file source.
@@ -300,9 +299,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
300299
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
301300
}
302301

303-
case _ =>
302+
case other =>
304303
throw new AnalysisException(s"TableProvider implementation $source cannot be " +
305-
"written with ErrorIfExists or Ignore modes, please use Append or Overwrite " +
304+
s"written with $other mode, please use Append or Overwrite " +
306305
"modes instead.")
307306
}
308307

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private[noop] object NoopTable extends Table with SupportsWrite {
4646
Set(
4747
TableCapability.BATCH_WRITE,
4848
TableCapability.STREAMING_WRITE,
49+
TableCapability.TRUNCATE,
4950
TableCapability.ACCEPT_ANY_SCHEMA).asJava
5051
}
5152
}

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
3838
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3939
import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.TableIdentifier
41+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
42+
import org.apache.spark.sql.execution.QueryExecution
4143
import org.apache.spark.sql.execution.datasources.DataSourceUtils
44+
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
4245
import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
4346
import org.apache.spark.sql.internal.SQLConf
4447
import org.apache.spark.sql.sources._
4548
import org.apache.spark.sql.types._
49+
import org.apache.spark.sql.util.QueryExecutionListener
4650
import org.apache.spark.util.Utils
4751

4852

@@ -239,15 +243,75 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
239243
}
240244

241245
test("save mode") {
242-
val df = spark.read
246+
spark.range(10).write
243247
.format("org.apache.spark.sql.test")
244-
.load()
248+
.mode(SaveMode.ErrorIfExists)
249+
.save()
250+
assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
245251

246-
df.write
252+
spark.range(10).write
253+
.format("org.apache.spark.sql.test")
254+
.mode(SaveMode.Append)
255+
.save()
256+
assert(LastOptions.saveMode === SaveMode.Append)
257+
258+
// By default the save mode is `ErrorIfExists` for data source v1.
259+
spark.range(10).write
247260
.format("org.apache.spark.sql.test")
248-
.mode(SaveMode.ErrorIfExists)
249261
.save()
250262
assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
263+
264+
spark.range(10).write
265+
.format("org.apache.spark.sql.test")
266+
.mode("default")
267+
.save()
268+
assert(LastOptions.saveMode === SaveMode.ErrorIfExists)
269+
}
270+
271+
test("save mode for data source v2") {
272+
var plan: LogicalPlan = null
273+
val listener = new QueryExecutionListener {
274+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
275+
plan = qe.analyzed
276+
277+
}
278+
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
279+
}
280+
281+
spark.listenerManager.register(listener)
282+
try {
283+
// append mode creates `AppendData`
284+
spark.range(10).write
285+
.format(classOf[NoopDataSource].getName)
286+
.mode(SaveMode.Append)
287+
.save()
288+
sparkContext.listenerBus.waitUntilEmpty(1000)
289+
assert(plan.isInstanceOf[AppendData])
290+
291+
// overwrite mode creates `OverwriteByExpression`
292+
spark.range(10).write
293+
.format(classOf[NoopDataSource].getName)
294+
.mode(SaveMode.Overwrite)
295+
.save()
296+
sparkContext.listenerBus.waitUntilEmpty(1000)
297+
assert(plan.isInstanceOf[OverwriteByExpression])
298+
299+
// By default the save mode is `ErrorIfExists` for data source v2.
300+
spark.range(10).write
301+
.format(classOf[NoopDataSource].getName)
302+
.save()
303+
sparkContext.listenerBus.waitUntilEmpty(1000)
304+
assert(plan.isInstanceOf[AppendData])
305+
306+
spark.range(10).write
307+
.format(classOf[NoopDataSource].getName)
308+
.mode("default")
309+
.save()
310+
sparkContext.listenerBus.waitUntilEmpty(1000)
311+
assert(plan.isInstanceOf[AppendData])
312+
} finally {
313+
spark.listenerManager.unregister(listener)
314+
}
251315
}
252316

253317
test("test path option in load") {

0 commit comments

Comments
 (0)