Skip to content

Conversation

@uncleGen
Copy link
Contributor

Problem Description

Reported by Silvio Fiorito

I've got a Kafka topic which I'm querying, running a windowed aggregation, with a 30 second watermark, 10 second trigger, writing out to Parquet with append output mode.

Every 10 second trigger generates a file, regardless of whether there was any data for that trigger, or whether any records were actually finalized by the watermark.

Is this expected behavior or should it not write out these empty files?

val df = spark.readStream.format("kafka")....

val query = df
  .withWatermark("timestamp", "30 seconds")
  .groupBy(window($"timestamp", "10 seconds"))
  .count()
  .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")

query
  .writeStream
  .format("parquet")
  .option("checkpointLocation", aggChk)
  .trigger(ProcessingTime("10 seconds"))
  .outputMode("append")
  .start(aggPath)

As the query executes, do a file listing on "aggPath" and you'll see 339 byte files at a minimum until we arrive at the first watermark and the initial batch is finalized. Even after that though, as there are empty batches it'll keep generating empty files every trigger.

What changes were proposed in this pull request?

Check the partition is empty or not, and skip empty partition to avoid output empty file.

How was this patch tested?

Jenkins

newOutputWriter(fileCounter)
// Skip the empty partition to avoid creating a mass of 'empty' files.
if (iter.hasNext) {
newOutputWriter(fileCounter)
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I proposed the similar PR before (about a year ago?) but got reverted. In this case, Parquet would not write out the footer and schema information. Namely, this will break the case below:

spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()

Up to my knowledge, we don't have test cases for them if I haven't missed related PRs it seems now there is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon IIUC, this case should fail as expected, as there is no output. Am i missing something?

spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading empty data should be fine too. It should preserve the schema. I am pretty sure that we want this case because mine was reverted due to the case above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your prompt. How about just left one empty file containing the metadata when df has empty partition? Furthmore, we may just left one metadata file?

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking in that way. I remember I did several tries at that time but failed to make a confident fix, and could not have some time to work on that further.

Another problem is, it might be related with a datasource-specific issue because, for example, ORC does not write out empty df. For example,

scala> spark.range(100).filter("id > 100").write.orc("/tmp/abc1")

scala> spark.read.orc("/tmp/abc1").show()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)

This issue is described in https://issues.apache.org/jira/browse/SPARK-15474.

FWIW, I happened to see https://issues.apache.org/jira/browse/SPARK-15693 around that time and I kind of felt we may be able to consolidate this issue with it although it is a rough idea.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75089 has finished for PR 17395 at commit 86a7d2f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75094 has finished for PR 17395 at commit 42da5af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

Let me change this pr into WIP based on the discussion with @HyukjinKwon

@uncleGen uncleGen changed the title [SPARK-20065][SS] Avoid to output empty parquet files [SPARK-20065][SS][WIP] Avoid to output empty parquet files Mar 24, 2017
@HyukjinKwon
Copy link
Member

Thank you for taking my opinion into account.

@SparkQA
Copy link

SparkQA commented Apr 29, 2017

Test build #76286 has finished for PR 17395 at commit 42da5af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Hi @uncleGen, how is it going?

@uncleGen
Copy link
Contributor Author

uncleGen commented May 8, 2017

@HyukjinKwon Sorry for the long absence. I will keep online for next period of time. Please give me some time.

@HyukjinKwon
Copy link
Member

Yea, I just pinged because I am just interested in this :).

@HyukjinKwon
Copy link
Member

hmmm. @uncleGen, shell we close this for now? reopening when it's ready would welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants