Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class UnivocityGenerator(
private val writerSettings = options.asWriterSettings
writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)
private var printHeader = options.headerFlag

// A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
// When the value is null, this converter should not be called.
Expand Down Expand Up @@ -72,15 +71,15 @@ class UnivocityGenerator(
values
}

def writeHeaders(): Unit = {
gen.writeHeaders()
}

/**
* Writes a single InternalRow to CSV using Univocity.
*/
def write(row: InternalRow): Unit = {
if (printHeader) {
gen.writeHeaders()
}
gen.writeRow(convertRow(row): _*)
printHeader = false
}

def writeToString(row: InternalRow): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter(

private var univocityGenerator: Option[UnivocityGenerator] = None

override def write(row: InternalRow): Unit = {
val gen = univocityGenerator.getOrElse {
val charset = Charset.forName(params.charset)
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
val newGen = new UnivocityGenerator(dataSchema, os, params)
univocityGenerator = Some(newGen)
newGen
}
if (params.headerFlag) {
val gen = getGen()
gen.writeHeaders()
}

private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
val charset = Charset.forName(params.charset)
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
val newGen = new UnivocityGenerator(dataSchema, os, params)
univocityGenerator = Some(newGen)
newGen
}

override def write(row: InternalRow): Unit = {
val gen = getGen()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. is this going to create UnivocityGenerator for each record?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's getOrElse. Okay but still can we simplify this logic? Looks a bit confusing. For instance, I think we can do this with lazy val.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Do you mean creating generator in lazy val?

lazy val univocityGenerator = {
    val charset = Charset.forName(params.charset)
    val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
    new UnivocityGenerator(dataSchema, os, params)
}

The problem is in the close method, you will have to call univocityGenerator.close() in the method. If the lazy val wasn't instantiated before (empty partition and the header option is false), the generator will be created and closed immediately. And as a result, you will get an empty file for the empty partition. That's why I prefer the approach with Option[UnivocityGenerator] in #23052.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the problem. OrcFileFormat uses a flag approach. For instance:

private var isGeneratorInitiated = false

lazy val univocityGenerator = {
  isGeneratorInitiated = true
  val charset = Charset.forName(params.charset)
  val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
  new UnivocityGenerator(dataSchema, os, params)
}

if (isGeneratorInitiated) {
  univocityGenerator.close()
}

Should be okay to stick to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i changed it to lazy val and flag

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frankly speaking I don't see any reasons for this. For now we have 2 flags actually - isGeneratorInitiated and another one inside of lazy val. And 2 slightly different approaches - with the Option type in Json and Text, and lazy val + flag in Orc and CSV.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have two different approaches, both of which are fine IMHO. I think it's reasonable to clean that up in a follow-up if desired. WDYT @HyukjinKwon ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will revert this change to lazy val for now since it doesnt have anything to do wit this pullreq or jira: the Option approach was created in another pullreq.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Shouldn't be a big deal.

gen.write(row)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1987,6 +1987,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
}

test("SPARK-26208: write and read empty data to csv file with headers") {
withTempPath { path =>
val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF
// we have 2 partitions but they are both empty and will be filtered out upon writing
// thanks to SPARK-23271 one new empty partition will be inserted
df1.write.format("csv").option("header", true).save(path.getAbsolutePath)
val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false)
.load(path.getAbsolutePath)
assert(df1.schema === df2.schema)
checkAnswer(df1, df2)
}
}

test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Expand Down