diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 1218f9242afe..2ab376c0ac20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -32,7 +32,6 @@ class UnivocityGenerator( private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) - private var printHeader = options.headerFlag // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. // When the value is null, this converter should not be called. @@ -72,15 +71,15 @@ class UnivocityGenerator( values } + def writeHeaders(): Unit = { + gen.writeHeaders() + } + /** * Writes a single InternalRow to CSV using Univocity. */ def write(row: InternalRow): Unit = { - if (printHeader) { - gen.writeHeaders() - } gen.writeRow(convertRow(row): _*) - printHeader = false } def writeToString(row: InternalRow): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 4c5a1d327023..f7d8a9e1042d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter( private var univocityGenerator: Option[UnivocityGenerator] = None - override def write(row: InternalRow): Unit = { - val gen = univocityGenerator.getOrElse { - val charset = Charset.forName(params.charset) - val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - val newGen = new UnivocityGenerator(dataSchema, os, params) - univocityGenerator = Some(newGen) - newGen - } + if (params.headerFlag) { + val gen = getGen() + gen.writeHeaders() + } + private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen + } + + override def write(row: InternalRow): Unit = { + val gen = getGen() gen.write(row) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e14e8d49db5c..bc950f2418d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1987,6 +1987,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(errMsg2.contains("'lineSep' can contain only 1 character")) } + test("SPARK-26208: write and read empty data to csv file with headers") { + withTempPath { path => + val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF + // we have 2 partitions but they are both empty and will be filtered out upon writing + // thanks to SPARK-23271 one new empty partition will be inserted + df1.write.format("csv").option("header", true).save(path.getAbsolutePath) + val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false) + .load(path.getAbsolutePath) + assert(df1.schema === df2.schema) + checkAnswer(df1, df2) + } + } + test("do not produce empty files for empty partitions") { withTempPath { dir => val path = dir.getCanonicalPath