Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
source,
df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val relation = DataSourceV2Relation.create(source, options)

if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
new DataSourceOptions(options.asJava))

} else {
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
new DataSourceOptions(options.asJava))

if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
}
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-25700: do not read schema when writing") {
withTempPath { file =>
val cls = classOf[SimpleWriteOnlyDataSource]
val path = file.getCanonicalPath
val df = spark.range(5).select('id as 'i, -'id as 'j)
try {
df.write.format(cls.getName).option("path", path).mode("error").save()
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
df.write.format(cls.getName).option("path", path).mode("ignore").save()
df.write.format(cls.getName).option("path", path).mode("append").save()
} catch {
case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
}
}
}
}

class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
Expand Down Expand Up @@ -594,3 +610,14 @@ class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])

override def close(): Unit = {}
}

class SchemaReadAttemptException(m: String) extends RuntimeException(m)

class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
override def fullSchema(): StructType = {
// This is a bit hacky since this source implements read support but throws
// during schema retrieval. Might have to rewrite but it's done
// such so for minimised changes.
throw new SchemaReadAttemptException("read is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class SimpleWritableDataSource extends DataSourceV2
with WriteSupport
with SessionConfigSupport {

private val schema = new StructType().add("i", "long").add("j", "long")
protected def fullSchema() = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"

class Reader(path: String, conf: Configuration) extends DataSourceReader {
override def readSchema(): StructType = schema
override def readSchema(): StructType = SimpleWritableDataSource.this.fullSchema()

override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
val dataPath = new Path(path)
Expand Down Expand Up @@ -113,7 +113,6 @@ class SimpleWritableDataSource extends DataSourceV2
schema: StructType,
mode: SaveMode,
options: DataSourceOptions): Optional[DataSourceWriter] = {
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))

val path = new Path(options.get("path").get())
Expand Down