Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 3, 2016

What changes were proposed in this pull request?

Currently, INSERT INTO with GROUP BY query tries to make at least 200 files (default value of spark.sql.shuffle.partition), which results in lots of empty files.

This PR makes it avoid creating empty files during overwriting into Hive table and in internal data sources with group by query.

This checks whether the given partition has data in it or not and creates/writes file only when it actually has data.

How was this patch tested?

Unittests in InsertIntoHiveTableSuite and HadoopFsRelationTest.

Closes #8411

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 3, 2016

I submitted this PR because #8411 looks abandoned and looks the author is not answering from the last comment by a committer. (It has been inactive almost half a year).

@HyukjinKwon
Copy link
Member Author

@yhuai Could you please take a look?

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57579 has finished for PR 12855 at commit 57f2ecc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57581 has finished for PR 12855 at commit 294b447.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57587 has finished for PR 12855 at commit ab2d092.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 3, 2016

Should we have the same logic for data sources?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 3, 2016

@rxin I thought so as well but I haven't tested yet. Could I look into that and make another PR if this one is merged maybe (would that be okay?)?

@rxin
Copy link
Contributor

rxin commented May 3, 2016

Can you look at it together with this? Seems like a good logical grouping and arguably data sources are more important than the Hive ones.

@HyukjinKwon
Copy link
Member Author

@rxin Sure, I will thanks.

@HyukjinKwon HyukjinKwon changed the title [SPARK-10216][SQL] Avoid creating empty files during overwrite into Hive table with group by query [SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query May 3, 2016
@HyukjinKwon
Copy link
Member Author

@rxin I could find the same issue in internal datasources. I just added the same logics and a test in HadoopFsRelationTest.

case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}
if (iterator.hasNext) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply added iterator.hasNext check.

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57624 has finished for PR 12855 at commit dee6a4e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57625 has finished for PR 12855 at commit b595b7f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented May 3, 2016

cc @marmbrus

@HyukjinKwon
Copy link
Member Author

Hi @marmbrus , Could you please take a look?

sql(
"""
|INSERT OVERWRITE TABLE table1
|SELECT count(key), value FROM testDataset GROUP BY value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you want to explicitly control the number of shuffle partitions? Otherwise, this test will not testing anything if the number of shuffle partitions is set to 2 by any chance?

Copy link
Member Author

@HyukjinKwon HyukjinKwon May 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Thank you!

@SparkQA
Copy link

SparkQA commented May 17, 2016

Test build #58662 has finished for PR 12855 at commit 24e16b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 17, 2016

Test build #58666 has finished for PR 12855 at commit 5f780a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Thanks, merging to master and 2.0.

@asfgit asfgit closed this in 8d05a7a May 17, 2016
asfgit pushed a commit that referenced this pull request May 17, 2016
… group by query

## What changes were proposed in this pull request?

Currently, `INSERT INTO` with `GROUP BY` query tries to make at least 200 files (default value of `spark.sql.shuffle.partition`), which results in lots of empty files.

This PR makes it avoid creating empty files during overwriting into Hive table and in internal data sources  with group by query.

This checks whether the given partition has data in it or not and creates/writes file only when it actually has data.

## How was this patch tested?

Unittests in `InsertIntoHiveTableSuite` and `HadoopFsRelationTest`.

Closes #8411

Author: hyukjinkwon <[email protected]>
Author: Keuntae Park <[email protected]>

Closes #12855 from HyukjinKwon/pr/8411.

(cherry picked from commit 8d05a7a)
Signed-off-by: Michael Armbrust <[email protected]>
@jurriaan
Copy link
Contributor

jurriaan commented May 18, 2016

This breaks writing empty dataframes for me.

Before this PR I could write empty dataframes without any problems.

Now it only writes a _SUCCESS file, and no metadata. Also, it sometimes throws a NullPointerException:

8-May-2016 22:37:14 WARNING: org.apache.parquet.hadoop.ParquetOutputCommitter: could not write summary file for file:/..../test
java.lang.NullPointerException
    at org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
    at org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
    at org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
    at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:220)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:144)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:57)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:55)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:417)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:626)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)

Edit: Added JIRA: https://issues.apache.org/jira/browse/SPARK-15393

@rxin
Copy link
Contributor

rxin commented May 18, 2016

Thanks for reporting.

@marmbrus
Copy link
Contributor

I'm going to revert this until we figure out the issues @HyukjinKwon can you reopen?

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 18, 2016

@jurriaan Oh, thank you. @marmbrus Yes please. You mean reopening JIRA? (it seems I can't reopen a merged PR)

@marmbrus
Copy link
Contributor

Sure, I thought you could reopen PRs you created, but if not feel free to create a new one and link.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented May 19, 2016

@marmbrus Sorry for letting you revert this, I should have thought of this further before opening this PR. I will try to think more and try more carefully.

@marmbrus
Copy link
Contributor

No worries!

asfgit pushed a commit that referenced this pull request May 20, 2016
…rit…

This reverts commit 8d05a7a from #12855, which seems to have caused regressions when working with empty DataFrames.

Author: Michael Armbrust <[email protected]>

Closes #13181 from marmbrus/revert12855.

(cherry picked from commit 2ba3ff0)
Signed-off-by: Michael Armbrust <[email protected]>
asfgit pushed a commit that referenced this pull request May 20, 2016
…rit…

This reverts commit 8d05a7a from #12855, which seems to have caused regressions when working with empty DataFrames.

Author: Michael Armbrust <[email protected]>

Closes #13181 from marmbrus/revert12855.
@DanielMe
Copy link

I can reproduce the issue that @jurriaan reports on 1.6.0 and on 1.5.2. The issue does not occur on 1.3.1.

I have added a comment to the JIRA issue with more detailed instructions how to reproduce: https://issues.apache.org/jira/browse/SPARK-15393

Note that this might mean that this PR did not cause the issue.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 21, 2016

@DanielMe Yes, actually it seems a different issue when you use emptyRDD[Row]. Apparently, this case does not produce any partitions whereas the code provided by @jurriaan produces some empty partitions. If you try his codes, you will find it working fine.

This was reverted because the latter case fails. So.. it seems the former case has been being failed from older versions and the latter is not being failed after this one is reverted.

@ghost
Copy link

ghost commented Jan 25, 2017

The issue that @jurriaan reported is still there in Spark 2.1.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants