Skip to content

Commit 227920a

Browse files
committed
Partially revert append mode support in Datasource V2
1 parent faf73dc commit 227920a

File tree

3 files changed

+27
-25
lines changed

3 files changed

+27
-25
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -246,23 +246,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
246246
df.sparkSession.sessionState.conf)
247247
val options = sessionOptions ++ extraOptions
248248

249-
val relation = DataSourceV2Relation.create(source, options)
250-
if (mode == SaveMode.Append) {
249+
// TODO: SPARK-24251 was reverted because it creates a readsupport at write path.
250+
val writer = provider.createBatchWriteSupport(
251+
UUID.randomUUID().toString,
252+
df.logicalPlan.output.toStructType,
253+
mode,
254+
new DataSourceOptions(options.asJava))
255+
256+
if (writer.isPresent) {
251257
runCommand(df.sparkSession, "save") {
252-
AppendData.byName(relation, df.logicalPlan)
253-
}
254-
255-
} else {
256-
val writer = provider.createBatchWriteSupport(
257-
UUID.randomUUID().toString,
258-
df.logicalPlan.output.toStructType,
259-
mode,
260-
new DataSourceOptions(options.asJava))
261-
262-
if (writer.isPresent) {
263-
runCommand(df.sparkSession, "save") {
264-
WriteToDataSourceV2(writer.get, df.logicalPlan)
265-
}
258+
WriteToDataSourceV2(writer.get, df.logicalPlan)
266259
}
267260
}
268261

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,38 +190,39 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
190190

191191
test("simple writable data source") {
192192
// TODO: java implementation.
193+
val writeOnlySource = classOf[SimpleWriteOnlyDataSource]
193194
Seq(classOf[SimpleWritableDataSource]).foreach { cls =>
194195
withTempPath { file =>
195196
val path = file.getCanonicalPath
196197
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
197198

198-
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
199+
spark.range(10).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName)
199200
.option("path", path).save()
200201
checkAnswer(
201202
spark.read.format(cls.getName).option("path", path).load(),
202203
spark.range(10).select('id, -'id))
203204

204205
// test with different save modes
205-
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
206+
spark.range(10).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName)
206207
.option("path", path).mode("append").save()
207208
checkAnswer(
208209
spark.read.format(cls.getName).option("path", path).load(),
209210
spark.range(10).union(spark.range(10)).select('id, -'id))
210211

211-
spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
212+
spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName)
212213
.option("path", path).mode("overwrite").save()
213214
checkAnswer(
214215
spark.read.format(cls.getName).option("path", path).load(),
215216
spark.range(5).select('id, -'id))
216217

217-
spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
218+
spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName)
218219
.option("path", path).mode("ignore").save()
219220
checkAnswer(
220221
spark.read.format(cls.getName).option("path", path).load(),
221222
spark.range(5).select('id, -'id))
222223

223224
val e = intercept[Exception] {
224-
spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
225+
spark.range(5).select('id as 'i, -'id as 'j).write.format(writeOnlySource.getName)
225226
.option("path", path).mode("error").save()
226227
}
227228
assert(e.getMessage.contains("data already exists"))
@@ -240,7 +241,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
240241
// this input data will fail to read middle way.
241242
val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j)
242243
val e2 = intercept[SparkException] {
243-
input.write.format(cls.getName).option("path", path).mode("overwrite").save()
244+
input.write.format(writeOnlySource.getName).option("path", path).mode("overwrite").save()
244245
}
245246
assert(e2.getMessage.contains("Writing job aborted"))
246247
// make sure we don't have partial data.
@@ -640,3 +641,12 @@ object SpecificReaderFactory extends PartitionReaderFactory {
640641
}
641642
}
642643
}
644+
645+
class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
646+
override def fullSchema(): StructType = {
647+
// This is a bit hacky since this source implements read support but throws
648+
// during schema retrieval. Might have to rewrite but it's done
649+
// such so for minimised changes.
650+
throw new UnsupportedOperationException("read is not supported")
651+
}
652+
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2
4343
with BatchWriteSupportProvider
4444
with SessionConfigSupport {
4545

46-
private val schema = new StructType().add("i", "long").add("j", "long")
46+
protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long")
4747

4848
override def keyPrefix: String = "simpleWritableDataSource"
4949

5050
class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {
5151

52-
override def fullSchema(): StructType = schema
52+
override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema()
5353

5454
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
5555
val dataPath = new Path(path)
@@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2
116116
schema: StructType,
117117
mode: SaveMode,
118118
options: DataSourceOptions): Optional[BatchWriteSupport] = {
119-
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
120119
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))
121120

122121
val path = new Path(options.get("path").get())

0 commit comments

Comments
 (0)