From 3192656ad91d326824360f4d4890dc1f6c3f6393 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 28 Nov 2018 14:03:20 -0500 Subject: [PATCH 01/11] write headers to empty csv files when header=true --- .../spark/sql/catalyst/csv/UnivocityGenerator.scala | 9 ++++----- .../execution/datasources/FileFormatDataWriter.scala | 2 ++ .../sql/execution/datasources/OutputWriter.scala | 3 +++ .../execution/datasources/csv/CSVFileFormat.scala | 6 ++++++ .../execution/datasources/json/JsonFileFormat.scala | 2 ++ .../execution/datasources/orc/OrcOutputWriter.scala | 2 ++ .../datasources/parquet/ParquetOutputWriter.scala | 2 ++ .../execution/datasources/text/TextFileFormat.scala | 2 ++ .../sql/execution/datasources/csv/CSVSuite.scala | 12 ++++++++++++ 9 files changed, 35 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 1218f9242afe..2ab376c0ac20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -32,7 +32,6 @@ class UnivocityGenerator( private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) - private var printHeader = options.headerFlag // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. // When the value is null, this converter should not be called. @@ -72,15 +71,15 @@ class UnivocityGenerator( values } + def writeHeaders(): Unit = { + gen.writeHeaders() + } + /** * Writes a single InternalRow to CSV using Univocity. */ def write(row: InternalRow): Unit = { - if (printHeader) { - gen.writeHeaders() - } gen.writeRow(convertRow(row): _*) - printHeader = false } def writeToString(row: InternalRow): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 10733810b641..1c03de9c0eaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -121,6 +121,7 @@ class SingleDirectoryDataWriter( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) + currentWriter.init() statsTrackers.foreach(_.newFile(currentPath)) } @@ -237,6 +238,7 @@ class DynamicPartitionDataWriter( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) + currentWriter.init() statsTrackers.foreach(_.newFile(currentPath)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala index 868e5371426c..7a8f25add057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala @@ -57,6 +57,9 @@ abstract class OutputWriterFactory extends Serializable { * executor side. This instance is used to persist rows to this single output file. */ abstract class OutputWriter { + /** Initializes before writing any rows. Invoked on executor size. */ + def init(): Unit + /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 964b56e706a0..d1394d133c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -180,6 +180,12 @@ private[csv] class CsvOutputWriter( private val gen = new UnivocityGenerator(dataSchema, writer, params) + override def init(): Unit = { + if (params.headerFlag) { + gen.writeHeaders() + } + } + override def write(row: InternalRow): Unit = gen.write(row) override def close(): Unit = gen.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 1f7c9d73f19f..a055298a1bb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -186,6 +186,8 @@ private[json] class JsonOutputWriter( // create the Generator without separator inserted between 2 records private[this] val gen = new JacksonGenerator(dataSchema, writer, options) + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { gen.write(row) gen.writeLineEnding() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala index 7e38fc651a31..d009f258c42d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala @@ -50,6 +50,8 @@ private[orc] class OrcOutputWriter( recordWriter } + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { recordWriter.write(NullWritable.get(), serializer.serialize(row)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 8361762b0970..98d4e8fa30d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -37,6 +37,8 @@ private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptCon }.getRecordWriter(context) } + override def init(): Unit = {} + override def write(row: InternalRow): Unit = recordWriter.write(null, row) override def close(): Unit = recordWriter.close(context) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 268297148b52..52451dea8d3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -150,6 +150,8 @@ class TextOutputWriter( private val writer = CodecStreams.createOutputStream(context, new Path(path)) + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { if (!row.isNullAt(0)) { val utf8string = row.getUTF8String(0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2efe1dda475c..ce673dadb1d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1859,4 +1859,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(df, Row(null, csv)) } } + + test("SPARK-26208: write and read empty data to csv file with header") { + withTempPath { path => + val df1 = Seq.empty[(String, String)].toDF("x", "y") + df1.printSchema + df1.write.format("csv").option("header", true).save(path.getAbsolutePath) + val df2 = spark.read.format("csv").option("header", true).load(path.getAbsolutePath) + df2.printSchema + assert(df1.schema === df2.schema) + checkAnswer(df1, df2) + } + } } From 1f897be372e1ffca5445a36d2c13c8ef2a489e2e Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 28 Nov 2018 15:36:20 -0500 Subject: [PATCH 02/11] add missing init overrides --- .../main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala | 2 ++ .../org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 2 ++ .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 ++ .../scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 ++ 4 files changed, 8 insertions(+) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 06507115f5ed..6af7ea348783 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -60,6 +60,8 @@ private[avro] class AvroOutputWriter( }.getRecordWriter(context) + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord]) recordWriter.write(key, NullWritable.get()) diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 39dcd911a081..33499c55e16b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -49,6 +49,8 @@ private[libsvm] class LibSVMOutputWriter( // This `asInstanceOf` is safe because it's guaranteed by `LibSVMFileFormat.verifySchema` private val udt = dataSchema(1).dataType.asInstanceOf[VectorUDT] + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { val label = row.getDouble(0) val vector = udt.deserialize(row.getStruct(1, udt.sqlType.length)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index d8d2a80e0e8b..9cc356afd4a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -141,6 +141,8 @@ class HiveOutputWriter( private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } private val outputData = new Array[Any](fieldOIs.length) + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { var i = 0 while (i < fieldOIs.length) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 4e641e34c18d..a834675accf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -272,6 +272,8 @@ private[orc] class OrcOutputWriter( ).asInstanceOf[RecordWriter[NullWritable, Writable]] } + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { recordWriter.write(NullWritable.get(), serializer.serialize(row)) } From bfadbf9ae5df8e4a1e84ffff5c82dddc91d5082f Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 28 Nov 2018 16:49:15 -0500 Subject: [PATCH 03/11] add missing init override --- .../scala/org/apache/spark/sql/sources/SimpleTextRelation.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 60a4638f610b..9f10304acf1c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -122,6 +122,8 @@ class SimpleTextOutputWriter(path: String, dataSchema: StructType, context: Task private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + override def init(): Unit = {} + override def write(row: InternalRow): Unit = { val serialized = row.toSeq(dataSchema).map { v => if (v == null) "" else v.toString From 258a1e437ce6fdf89bbed79b58175b3b1044d2c6 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 29 Nov 2018 13:17:47 -0500 Subject: [PATCH 04/11] provide default do-nothing implementation for OutputWriter.init --- .../main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala | 2 -- .../org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 2 -- .../apache/spark/sql/execution/datasources/OutputWriter.scala | 2 +- .../spark/sql/execution/datasources/json/JsonFileFormat.scala | 2 -- .../spark/sql/execution/datasources/orc/OrcOutputWriter.scala | 2 -- .../sql/execution/datasources/parquet/ParquetOutputWriter.scala | 2 -- .../spark/sql/execution/datasources/text/TextFileFormat.scala | 2 -- .../org/apache/spark/sql/hive/execution/HiveFileFormat.scala | 2 -- .../scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 -- .../scala/org/apache/spark/sql/sources/SimpleTextRelation.scala | 2 -- 10 files changed, 1 insertion(+), 19 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 6af7ea348783..06507115f5ed 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -60,8 +60,6 @@ private[avro] class AvroOutputWriter( }.getRecordWriter(context) - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord]) recordWriter.write(key, NullWritable.get()) diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 33499c55e16b..39dcd911a081 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -49,8 +49,6 @@ private[libsvm] class LibSVMOutputWriter( // This `asInstanceOf` is safe because it's guaranteed by `LibSVMFileFormat.verifySchema` private val udt = dataSchema(1).dataType.asInstanceOf[VectorUDT] - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { val label = row.getDouble(0) val vector = udt.deserialize(row.getStruct(1, udt.sqlType.length)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala index 7a8f25add057..2763d3467d37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala @@ -58,7 +58,7 @@ abstract class OutputWriterFactory extends Serializable { */ abstract class OutputWriter { /** Initializes before writing any rows. Invoked on executor size. */ - def init(): Unit + def init(): Unit = {} /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 19a11bb7da45..3042133ee43a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -177,8 +177,6 @@ private[json] class JsonOutputWriter( private var jacksonGenerator: Option[JacksonGenerator] = None - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { val gen = jacksonGenerator.getOrElse { val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala index d009f258c42d..7e38fc651a31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala @@ -50,8 +50,6 @@ private[orc] class OrcOutputWriter( recordWriter } - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { recordWriter.write(NullWritable.get(), serializer.serialize(row)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 98d4e8fa30d9..8361762b0970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -37,8 +37,6 @@ private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptCon }.getRecordWriter(context) } - override def init(): Unit = {} - override def write(row: InternalRow): Unit = recordWriter.write(null, row) override def close(): Unit = recordWriter.close(context) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 7e0c88a27150..01948ab25d63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -152,8 +152,6 @@ class TextOutputWriter( private var outputStream: Option[OutputStream] = None - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { val os = outputStream.getOrElse{ val newStream = CodecStreams.createOutputStream(context, new Path(path)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 9cc356afd4a4..d8d2a80e0e8b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -141,8 +141,6 @@ class HiveOutputWriter( private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } private val outputData = new Array[Any](fieldOIs.length) - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { var i = 0 while (i < fieldOIs.length) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index a834675accf7..4e641e34c18d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -272,8 +272,6 @@ private[orc] class OrcOutputWriter( ).asInstanceOf[RecordWriter[NullWritable, Writable]] } - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { recordWriter.write(NullWritable.get(), serializer.serialize(row)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9f10304acf1c..60a4638f610b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -122,8 +122,6 @@ class SimpleTextOutputWriter(path: String, dataSchema: StructType, context: Task private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) - override def init(): Unit = {} - override def write(row: InternalRow): Unit = { val serialized = row.toSeq(dataSchema).map { v => if (v == null) "" else v.toString From 088c7106b8837ada52ad3a5b1d356c3572ff0af7 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 29 Nov 2018 16:28:40 -0500 Subject: [PATCH 05/11] cleanup test --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7e4d882b16c2..12363507a54f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1987,13 +1987,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(errMsg2.contains("'lineSep' can contain only 1 character")) } - test("SPARK-26208: write and read empty data to csv file with header") { + test("SPARK-26208: write and read empty data to csv file with headers") { withTempPath { path => val df1 = Seq.empty[(String, String)].toDF("x", "y") - df1.printSchema df1.write.format("csv").option("header", true).save(path.getAbsolutePath) - val df2 = spark.read.format("csv").option("header", true).load(path.getAbsolutePath) - df2.printSchema + val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false) + .load(path.getAbsolutePath) assert(df1.schema === df2.schema) checkAnswer(df1, df2) } From 879be719e5244e990465b6f2c67de37bafa9b2c3 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 29 Nov 2018 20:07:26 -0500 Subject: [PATCH 06/11] add repartition to test --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 12363507a54f..112fc712706f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1989,10 +1989,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-26208: write and read empty data to csv file with headers") { withTempPath { path => - val df1 = Seq.empty[(String, String)].toDF("x", "y") + val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF + // we have 2 partitions but they are both empty and will be filtered out upon writing + // thanks to SPARK-23271 one new empty partition will be inserted df1.write.format("csv").option("header", true).save(path.getAbsolutePath) val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false) .load(path.getAbsolutePath) + assert(df1.rdd.getNumPartitions == 2) + assert(df2.rdd.getNumPartitions == 1) assert(df1.schema === df2.schema) checkAnswer(df1, df2) } From 29fc6b89094841ba2a28827247305e4fa6c01520 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 29 Nov 2018 20:20:49 -0500 Subject: [PATCH 07/11] remove init --- .../sql/execution/datasources/FileFormatDataWriter.scala | 2 -- .../spark/sql/execution/datasources/OutputWriter.scala | 3 --- .../sql/execution/datasources/csv/CSVFileFormat.scala | 8 +++----- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 1c03de9c0eaf..10733810b641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -121,7 +121,6 @@ class SingleDirectoryDataWriter( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) - currentWriter.init() statsTrackers.foreach(_.newFile(currentPath)) } @@ -238,7 +237,6 @@ class DynamicPartitionDataWriter( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) - currentWriter.init() statsTrackers.foreach(_.newFile(currentPath)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala index 2763d3467d37..868e5371426c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala @@ -57,9 +57,6 @@ abstract class OutputWriterFactory extends Serializable { * executor side. This instance is used to persist rows to this single output file. */ abstract class OutputWriter { - /** Initializes before writing any rows. Invoked on executor size. */ - def init(): Unit = {} - /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 40ab8124cd5a..b7b0446ddb8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -179,11 +179,9 @@ private[csv] class CsvOutputWriter( newGen } - override def init(): Unit = { - if (params.headerFlag) { - val gen = getOrCreateGen() - gen.writeHeaders() - } + if (params.headerFlag) { + val gen = getOrCreateGen() + gen.writeHeaders() } override def write(row: InternalRow): Unit = { From 6165e1a6c65601b57a06905c497cc18369d74354 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 30 Nov 2018 11:17:29 -0500 Subject: [PATCH 08/11] move code up so its clearer its part of constructor --- .../execution/datasources/csv/CSVFileFormat.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index b7b0446ddb8e..f7d8a9e1042d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -171,7 +171,12 @@ private[csv] class CsvOutputWriter( private var univocityGenerator: Option[UnivocityGenerator] = None - private def getOrCreateGen(): UnivocityGenerator = univocityGenerator.getOrElse { + if (params.headerFlag) { + val gen = getGen() + gen.writeHeaders() + } + + private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { val charset = Charset.forName(params.charset) val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) val newGen = new UnivocityGenerator(dataSchema, os, params) @@ -179,13 +184,8 @@ private[csv] class CsvOutputWriter( newGen } - if (params.headerFlag) { - val gen = getOrCreateGen() - gen.writeHeaders() - } - override def write(row: InternalRow): Unit = { - val gen = getOrCreateGen() + val gen = getGen() gen.write(row) } From 5e87e7e56f389c8b757e62dea0962a8a0d506645 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sat, 1 Dec 2018 11:14:08 -0500 Subject: [PATCH 09/11] dont check number of partitions --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 112fc712706f..bc950f2418d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1995,8 +1995,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te df1.write.format("csv").option("header", true).save(path.getAbsolutePath) val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false) .load(path.getAbsolutePath) - assert(df1.rdd.getNumPartitions == 2) - assert(df2.rdd.getNumPartitions == 1) assert(df1.schema === df2.schema) checkAnswer(df1, df2) } From 238efa59fe3b63f88a36b1bb7634dd868e2f2097 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sat, 1 Dec 2018 11:21:48 -0500 Subject: [PATCH 10/11] use lazy val for univocity generator --- .../datasources/csv/CSVFileFormat.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index f7d8a9e1042d..34808d03bd03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -169,25 +169,26 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private var univocityGenerator: Option[UnivocityGenerator] = None + private var isGeneratorInitiated = false - if (params.headerFlag) { - val gen = getGen() - gen.writeHeaders() - } - - private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { + private lazy val univocityGenerator = { + isGeneratorInitiated = true val charset = Charset.forName(params.charset) val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - val newGen = new UnivocityGenerator(dataSchema, os, params) - univocityGenerator = Some(newGen) - newGen + new UnivocityGenerator(dataSchema, os, params) + } + + if (params.headerFlag) { + univocityGenerator.writeHeaders() } override def write(row: InternalRow): Unit = { - val gen = getGen() - gen.write(row) + univocityGenerator.write(row) } - override def close(): Unit = univocityGenerator.map(_.close()) + override def close(): Unit = { + if (isGeneratorInitiated) { + univocityGenerator.close() + } + } } From 9d5cb7be9a8cb97bd54dd1e938ba819ed3066351 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sat, 1 Dec 2018 16:29:36 -0500 Subject: [PATCH 11/11] Revert "use lazy val for univocity generator" This reverts commit 238efa59fe3b63f88a36b1bb7634dd868e2f2097. --- .../datasources/csv/CSVFileFormat.scala | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 34808d03bd03..f7d8a9e1042d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -169,26 +169,25 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private var isGeneratorInitiated = false + private var univocityGenerator: Option[UnivocityGenerator] = None - private lazy val univocityGenerator = { - isGeneratorInitiated = true - val charset = Charset.forName(params.charset) - val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - new UnivocityGenerator(dataSchema, os, params) + if (params.headerFlag) { + val gen = getGen() + gen.writeHeaders() } - if (params.headerFlag) { - univocityGenerator.writeHeaders() + private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen } override def write(row: InternalRow): Unit = { - univocityGenerator.write(row) + val gen = getGen() + gen.write(row) } - override def close(): Unit = { - if (isGeneratorInitiated) { - univocityGenerator.close() - } - } + override def close(): Unit = univocityGenerator.map(_.close()) }