Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter(
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {

private val charset = Charset.forName(params.charset)

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)

private val gen = new UnivocityGenerator(dataSchema, writer, params)
private var univocityGenerator: Option[UnivocityGenerator] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a race condition below then where multiple generators can be created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not observe any race conditions so far. Instances of UnivocityGenerator are created per-tasks as well as OutputStreamWriters. They share instances of schema and CSVOptions but we do not modify them while writing. Inside of each UnivocityGenerator, we create an instance of CsvWriter but I almost absolutely sure they do not share anything internally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean that it would cause an error, but that it could create many generators and writers that aren't closed. It may not be obvious that it's happening. Unless we know writes will only happen in one thread what about breaking out and synchronizing the get/create part of this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but that it could create many generators and writers that aren't closed.

Writers/generators are created inside of tasks:

val dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
}
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
while (iterator.hasNext) {
dataWriter.write(iterator.next())
}
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(s"Job $jobId aborted.")
})
} catch {
case e: FetchFailedException =>
throw e
case t: Throwable =>
throw new SparkException("Task failed while writing rows.", t)
}
}
where dataWriter.commit() and dataWriter.abort() close writers/generators. So, number of not closed generators is less or equal to the size of the task thread pool on executors at any moment.

Unless we know writes will only happen in one thread ...

According to comments below, this is our assumption:

/**
* Abstract class for writing out data in a single Spark task.
* Exceptions thrown by the implementation of this trait will automatically trigger task aborts.
*/
abstract class FileFormatDataWriter(


override def write(row: InternalRow): Unit = {
val gen = univocityGenerator.getOrElse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, one thing we should not forget about is, CSV could have headers even if the records are empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it is fine to write only headers if an user wants to have them. Filtering the header out on this level could be slightly difficult.

val charset = Charset.forName(params.charset)
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
val newGen = new UnivocityGenerator(dataSchema, os, params)
univocityGenerator = Some(newGen)
newGen
}

override def write(row: InternalRow): Unit = gen.write(row)
gen.write(row)
}

override def close(): Unit = gen.close()
override def close(): Unit = univocityGenerator.map(_.close())
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,20 @@ private[json] class JsonOutputWriter(
" which can be read back by Spark only if multiLine is enabled.")
}

private val writer = CodecStreams.createOutputStreamWriter(
context, new Path(path), encoding)

// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
private var jacksonGenerator: Option[JacksonGenerator] = None

override def write(row: InternalRow): Unit = {
val gen = jacksonGenerator.getOrElse {
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
val newGen = new JacksonGenerator(dataSchema, os, options)
jacksonGenerator = Some(newGen)
newGen
}

gen.write(row)
gen.writeLineEnding()
}

override def close(): Unit = {
gen.close()
writer.close()
}
override def close(): Unit = jacksonGenerator.map(_.close())
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.text

import java.io.OutputStream

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -148,17 +150,23 @@ class TextOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {

private val writer = CodecStreams.createOutputStream(context, new Path(path))
private var outputStream: Option[OutputStream] = None

override def write(row: InternalRow): Unit = {
val os = outputStream.getOrElse{
val newStream = CodecStreams.createOutputStream(context, new Path(path))
outputStream = Some(newStream)
newStream
}

if (!row.isNullAt(0)) {
val utf8string = row.getUTF8String(0)
utf8string.writeTo(writer)
utf8string.writeTo(os)
}
writer.write(lineSeparator)
os.write(lineSeparator)
}

override def close(): Unit = {
writer.close()
outputStream.map(_.close())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}.getMessage
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.csv(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("csv")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.text(path)

val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
assert(jsonDF.count() === corruptRecordCount)
assert(jsonDF.schema === new StructType()
.add("_corrupt_record", StringType)
.add("dummy", StringType))
Expand All @@ -1911,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
}
}

Expand Down Expand Up @@ -2556,4 +2556,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
emptyString(StringType, "")
emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.json(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("json")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
assert(data(3) == Row("\"doh\""))
assert(data.length == 4)
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.emptyDataset[String].write.text(path)
val files = new File(path).listFiles()
assert(!files.exists(_.getName.endsWith("txt")))
}
}
}