diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index ff1911d69a6b..4c5a1d327023 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private val charset = Charset.forName(params.charset) - - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - - private val gen = new UnivocityGenerator(dataSchema, writer, params) + private var univocityGenerator: Option[UnivocityGenerator] = None + + override def write(row: InternalRow): Unit = { + val gen = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen + } - override def write(row: InternalRow): Unit = gen.write(row) + gen.write(row) + } - override def close(): Unit = gen.close() + override def close(): Unit = univocityGenerator.map(_.close()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 610f0d1619fc..3042133ee43a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -175,19 +175,20 @@ private[json] class JsonOutputWriter( " which can be read back by Spark only if multiLine is enabled.") } - private val writer = CodecStreams.createOutputStreamWriter( - context, new Path(path), encoding) - - // create the Generator without separator inserted between 2 records - private[this] val gen = new JacksonGenerator(dataSchema, writer, options) + private var jacksonGenerator: Option[JacksonGenerator] = None override def write(row: InternalRow): Unit = { + val gen = jacksonGenerator.getOrElse { + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) + // create the Generator without separator inserted between 2 records + val newGen = new JacksonGenerator(dataSchema, os, options) + jacksonGenerator = Some(newGen) + newGen + } + gen.write(row) gen.writeLineEnding() } - override def close(): Unit = { - gen.close() - writer.close() - } + override def close(): Unit = jacksonGenerator.map(_.close()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 268297148b52..01948ab25d63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.OutputStream + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -148,17 +150,23 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private val writer = CodecStreams.createOutputStream(context, new Path(path)) + private var outputStream: Option[OutputStream] = None override def write(row: InternalRow): Unit = { + val os = outputStream.getOrElse{ + val newStream = CodecStreams.createOutputStream(context, new Path(path)) + outputStream = Some(newStream) + newStream + } + if (!row.isNullAt(0)) { val utf8string = row.getUTF8String(0) - utf8string.writeTo(writer) + utf8string.writeTo(os) } - writer.write(lineSeparator) + os.write(lineSeparator) } override def close(): Unit = { - writer.close() + outputStream.map(_.close()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c275d63d32cc..e14e8d49db5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te }.getMessage assert(errMsg2.contains("'lineSep' can contain only 1 character")) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.csv(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("csv"))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9ea9189cdf7f..8807b131ceaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1898,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file + assert(jsonDF.count() === corruptRecordCount) assert(jsonDF.schema === new StructType() .add("_corrupt_record", StringType) .add("dummy", StringType)) @@ -1911,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 5, 7)) // null row for empty file + checkAnswer(counts, Row(1, 4, 6)) // null row for empty file } } @@ -2556,4 +2556,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { emptyString(StringType, "") emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.json(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("json"))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0e7f3afa9c3a..a86d5ee37f3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext { assert(data(3) == Row("\"doh\"")) assert(data.length == 4) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.text(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("txt"))) + } + } }