Skip to content

Conversation

@xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Jul 17, 2017

What changes were proposed in this pull request?

Add EmptyDirectoryWriteTask for empty task while writing files. Fix the empty result for parquet format by leaving the first partition for meta writing.

How was this patch tested?

Add new test in FileFormatWriterSuite

@xuanyuanking
Copy link
Member Author

@HyukjinKwon Thanks for you comment, as your mentioned in #18650 and #17395, empty results of parquet can be fixed by leave the first partition, how about the orc format? The orc format error for empty result should also consider together within this patch?

@HyukjinKwon
Copy link
Member

I think ORC can be dealt with separately (the problem is within ORC source given my past investigation).


val writeTask =
if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
if (sparkPartitionId != 0 && !iterator.hasNext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be okay in that sense we are guaranteed partitions to be started from 0 up to my knowledge. Could you take a look and see if it makes sense to you cc @cloud-fan if you don't mind? I am not confident enough to proceed reviewing and leave a sign-off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little hacky but is the simplest fix I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yhuai too who reviewed my similar PR before.

class FileFormatWriterSuite extends QueryTest with SharedSQLContext {

test("empty file should be skipped while write to file") {
withTempDir { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTempPath can be used instead I believe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

dir.delete()
spark.range(10000).repartition(10).write.parquet(dir.toString)
val df = spark.read.parquet(dir.toString)
val allFiles = dir.listFiles(new FilenameFilter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do this simpler? for example,

.listFiles().filter { f =>
  f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both is ok I think, just copy this from HadoopFsRelationSuite.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. If both are okay, let's go for the shorter one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the shorter one

!name.startsWith(".") && !name.startsWith("_")
}
})
assert(allFiles.length == 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask what this test targets? I think I am lost around here ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make sure the source dir have many files, and the output dir only have 2 files.
If this make people confuse, just leave a notes and delete the assert?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I guess this one (the latter) does not test this change? If this test passes regardless of this PR change, I would rather remove this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll remove this assert and leave a note.


withTempDir { dst_dir =>
dst_dir.delete()
df.where("id = 50").write.parquet(dst_dir.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would explicitly repartition here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need repartition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking just in order to make sure the (previous) number of files written out.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean.. for example, if we happen to have few partitions in the df in any event, I guess this test can become invalid ...

@SparkQA
Copy link

SparkQA commented Jul 17, 2017

Test build #79667 has finished for PR 18654 at commit 6153001.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@cloud-fan
Copy link
Contributor

leaving the first partition for meta writing

What is the meta we need to write?

@HyukjinKwon
Copy link
Member

schema and the footer in case of Parquet. There is more context here - #17395 (comment).

For example, if we don't write out the empty files, it breaks:

spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()

@xuanyuanking
Copy link
Member Author

Yep, empty result dir need this meta, otherwise will throw the exception:

org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:188)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:188)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:187)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:571)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:555)
  ... 48 elided

!name.startsWith(".") && !name.startsWith("_")
}
})
// First partition file and the data file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we only need the first partition file if all other partitions are empty, but this is hard to do right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't agree more, firstly I try to implement like this but the FileFormatWriter.write can only see the iterator of each task self.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79687 has finished for PR 18654 at commit 6153001.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

class FileFormatWriterSuite extends QueryTest with SharedSQLContext {

test("empty file should be skipped while write to file") {
withTempPath { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe just do as below?

withTempPath { path =>
  spark.range(100).repartition(10).where("id = 50").write.parquet(path)
  val partFiles = path.listFiles()
    .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))    
  assert(partFiles.length === 2)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More clear :) No need to create source files in real.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79694 has finished for PR 18654 at commit f7d7c09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79695 has finished for PR 18654 at commit d118d68.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79700 has finished for PR 18654 at commit d118d68.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79704 has finished for PR 18654 at commit d118d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

ping @cloud-fan @HyukjinKwon

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in 81c99a5 Jul 19, 2017
@xuanyuanking xuanyuanking deleted the SPARK-21435 branch July 21, 2017 01:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants