Skip to content

Commit 31c4fab

Browse files
MaxGekksrowen
authored andcommitted
[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources
## What changes were proposed in this pull request? In the PR, I propose to postpone creation of `OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location. ## How was this patch tested? Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files. Closes #23052 from MaxGekk/text-empty-files. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Maxim Gekk <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 9a09e91 commit 31c4fab

File tree

6 files changed

+64
-22
lines changed

6 files changed

+64
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter(
169169
context: TaskAttemptContext,
170170
params: CSVOptions) extends OutputWriter with Logging {
171171

172-
private val charset = Charset.forName(params.charset)
173-
174-
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
175-
176-
private val gen = new UnivocityGenerator(dataSchema, writer, params)
172+
private var univocityGenerator: Option[UnivocityGenerator] = None
173+
174+
override def write(row: InternalRow): Unit = {
175+
val gen = univocityGenerator.getOrElse {
176+
val charset = Charset.forName(params.charset)
177+
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
178+
val newGen = new UnivocityGenerator(dataSchema, os, params)
179+
univocityGenerator = Some(newGen)
180+
newGen
181+
}
177182

178-
override def write(row: InternalRow): Unit = gen.write(row)
183+
gen.write(row)
184+
}
179185

180-
override def close(): Unit = gen.close()
186+
override def close(): Unit = univocityGenerator.map(_.close())
181187
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,19 +175,20 @@ private[json] class JsonOutputWriter(
175175
" which can be read back by Spark only if multiLine is enabled.")
176176
}
177177

178-
private val writer = CodecStreams.createOutputStreamWriter(
179-
context, new Path(path), encoding)
180-
181-
// create the Generator without separator inserted between 2 records
182-
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
178+
private var jacksonGenerator: Option[JacksonGenerator] = None
183179

184180
override def write(row: InternalRow): Unit = {
181+
val gen = jacksonGenerator.getOrElse {
182+
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
183+
// create the Generator without separator inserted between 2 records
184+
val newGen = new JacksonGenerator(dataSchema, os, options)
185+
jacksonGenerator = Some(newGen)
186+
newGen
187+
}
188+
185189
gen.write(row)
186190
gen.writeLineEnding()
187191
}
188192

189-
override def close(): Unit = {
190-
gen.close()
191-
writer.close()
192-
}
193+
override def close(): Unit = jacksonGenerator.map(_.close())
193194
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.text
1919

20+
import java.io.OutputStream
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.fs.{FileStatus, Path}
2224
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -148,17 +150,23 @@ class TextOutputWriter(
148150
context: TaskAttemptContext)
149151
extends OutputWriter {
150152

151-
private val writer = CodecStreams.createOutputStream(context, new Path(path))
153+
private var outputStream: Option[OutputStream] = None
152154

153155
override def write(row: InternalRow): Unit = {
156+
val os = outputStream.getOrElse{
157+
val newStream = CodecStreams.createOutputStream(context, new Path(path))
158+
outputStream = Some(newStream)
159+
newStream
160+
}
161+
154162
if (!row.isNullAt(0)) {
155163
val utf8string = row.getUTF8String(0)
156-
utf8string.writeTo(writer)
164+
utf8string.writeTo(os)
157165
}
158-
writer.write(lineSeparator)
166+
os.write(lineSeparator)
159167
}
160168

161169
override def close(): Unit = {
162-
writer.close()
170+
outputStream.map(_.close())
163171
}
164172
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
19861986
}.getMessage
19871987
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
19881988
}
1989+
1990+
test("do not produce empty files for empty partitions") {
1991+
withTempPath { dir =>
1992+
val path = dir.getCanonicalPath
1993+
spark.emptyDataset[String].write.csv(path)
1994+
val files = new File(path).listFiles()
1995+
assert(!files.exists(_.getName.endsWith("csv")))
1996+
}
1997+
}
19891998
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1897,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
18971897
.text(path)
18981898

18991899
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
1900-
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
1900+
assert(jsonDF.count() === corruptRecordCount)
19011901
assert(jsonDF.schema === new StructType()
19021902
.add("_corrupt_record", StringType)
19031903
.add("dummy", StringType))
@@ -1910,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
19101910
F.count($"dummy").as("valid"),
19111911
F.count($"_corrupt_record").as("corrupt"),
19121912
F.count("*").as("count"))
1913-
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
1913+
checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
19141914
}
19151915
}
19161916

@@ -2555,4 +2555,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
25552555
emptyString(StringType, "")
25562556
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
25572557
}
2558+
2559+
test("do not produce empty files for empty partitions") {
2560+
withTempPath { dir =>
2561+
val path = dir.getCanonicalPath
2562+
spark.emptyDataset[String].write.json(path)
2563+
val files = new File(path).listFiles()
2564+
assert(!files.exists(_.getName.endsWith("json")))
2565+
}
2566+
}
25582567
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
233233
assert(data(3) == Row("\"doh\""))
234234
assert(data.length == 4)
235235
}
236+
237+
test("do not produce empty files for empty partitions") {
238+
withTempPath { dir =>
239+
val path = dir.getCanonicalPath
240+
spark.emptyDataset[String].write.text(path)
241+
val files = new File(path).listFiles()
242+
assert(!files.exists(_.getName.endsWith("txt")))
243+
}
244+
}
236245
}

0 commit comments

Comments
 (0)