Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 1, 2015

JIRA: https://issues.apache.org/jira/browse/SPARK-8756

Currently, in ParquetRelation2, footers are re-read every time refresh() is called. But we can check if it is possibly changed before we do the reading because reading all footers will be expensive when there are too many partitions. This pr fixes this by keeping some cached information to check it.

@SparkQA
Copy link

SparkQA commented Jul 1, 2015

Test build #36245 has finished for PR 7154 at commit 0ef8caf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // compiled class file for the closure here will conflict with the one in callUDF (upper case).

@rxin
Copy link
Contributor

rxin commented Jul 19, 2015

cc @liancheng

@liancheng
Copy link
Contributor

Thanks for contributing this patch! I have two high level comments here:

  1. PR [SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery #7396 also tries to accelerate Parquet metadata discovery/refreshing by several means, and has been proven to be quite effective. We've observed ~50x speedup on large partitioned S3 dataset with schema merging enabled.
  2. How about adding a check for FileStatus.getModificationTime? Namely, we only read footers of new files and existing files that are modified since last refresh. This can be particularly useful for appending.

In general, this PR can be a good complement for #7396.

@viirya
Copy link
Member Author

viirya commented Jul 19, 2015

@liancheng thanks for commenting. I've noticed #7396 and I think it will bring great improvement to parquet performance.

In fact I have also submitted another PR #7238 to improve parquet schema merging performance. As I tested, it can improve much the performance. #7238 can be a complement to #7396 as well, but it may have much more conflicts with #7396. So I will refactor it after #7396 is getting merged.

@viirya
Copy link
Member Author

viirya commented Jul 19, 2015

I will add the check for FileStatus.getModificationTime later.

@viirya
Copy link
Member Author

viirya commented Jul 20, 2015

@liancheng I've added the check you suggested. Please take a look when you have time. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37812 has finished for PR 7154 at commit 12a0ed9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

#7396 has been merged

@viirya
Copy link
Member Author

viirya commented Jul 21, 2015

@marmbrus thanks, I'm going to solve the conflicts.

…quet_relation

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37899 has finished for PR 7154 at commit 21bbdec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ExpectsInputTypes extends Expression
    • trait ImplicitCastInputTypes extends ExpectsInputTypes
    • trait Unevaluable extends Expression
    • trait Nondeterministic extends Expression
    • trait CodegenFallback extends Expression
    • case class Hex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Unhex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class FakeFileStatus(

@viirya
Copy link
Member Author

viirya commented Jul 21, 2015

I think it should be an unrelated failure.

@viirya
Copy link
Member Author

viirya commented Jul 21, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #39 has finished for PR 7154 at commit 21bbdec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 21, 2015

ping @liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a deep copy here? Currently it's OK because cachedLeafStatuses() always returns a new instance of Set[FileStatus], but it's possible that we use a mutable set object in the future. In that case, the != predicate above will always be true.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38062 has finished for PR 7154 at commit fa5458f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class UnresolvedFunction(
    • case class Average(child: Expression) extends AlgebraicAggregate
    • case class Count(child: Expression) extends AlgebraicAggregate
    • case class First(child: Expression) extends AlgebraicAggregate
    • case class Last(child: Expression) extends AlgebraicAggregate
    • case class Max(child: Expression) extends AlgebraicAggregate
    • case class Min(child: Expression) extends AlgebraicAggregate
    • case class Sum(child: Expression) extends AlgebraicAggregate
    • abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable
    • implicit class RichAttribute(a: AttributeReference)
    • trait AggregateExpression1 extends AggregateExpression
    • trait PartialAggregate1 extends AggregateExpression1
    • case class Min(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class MinFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class Max(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class MaxFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class Count(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class CountFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate1
    • case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression1
    • case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression1
    • case class Average(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class AverageFunction(expr: Expression, base: AggregateExpression1)
    • case class Sum(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class SumFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class CombineSum(child: Expression) extends AggregateExpression1
    • case class CombineSumFunction(expr: Expression, base: AggregateExpression1)
    • case class SumDistinct(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class SumDistinctFunction(expr: Expression, base: AggregateExpression1)
    • case class CombineSetsAndSum(inputSet: Expression, base: Expression) extends AggregateExpression1
    • case class First(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class FirstFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class Last(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class LastFunction(expr: Expression, base: AggregateExpression1) extends AggregateFunction1
    • case class Aggregate2Sort(
    • case class FinalAndCompleteAggregate2Sort(
    • class GroupingIterator(
    • class PartialSortAggregationIterator(
    • class PartialMergeSortAggregationIterator(
    • class FinalSortAggregationIterator(
    • class FinalAndCompleteSortAggregationIterator(
    • abstract class UserDefinedAggregateFunction extends Serializable
    • case class ScalaUDAF(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there's still a bug here :) We also need to remove those files that only exist in the old cache (namely removed files). This happens when an existing directory is overwritten.

I think we can first keep both the old cache and the result of cachedLeafStatuses(), then filter out updated and new files, and at last update the old FileStatus cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are trying to only keep FileStatuses of those files that need to be touched during schema merging here. Actually the FileStatus cache must be consistent with the files stored on the file system. Because we also inject the cache to ParquetInputFormat in ParquetRelation2.buildScan to avoid calling listStatus repeatedly there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am thinking, if we only read the footers of the updated and newly added files, the merged schema may be incorrect? For the removed files, it is the same situation. If we don't re-merge all footers' schema, the schema should not be correct.

So this pr should check if we need to re-read all footers and merge schema based on whether the FileStatuses are updated or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. After reading footers and merging schemas of of new files and updated files, we also need to merge the result schema with the old schema, because some columns may be missing in new files and/or updated files.

Actually I found it might be difficult to define the "correctness" of the merged schema. Take the following scenario as an example:

  1. Initially there is file f0, which comes with a single column c0.

    Merged schema: c0

  2. File f1 is added, which contains a single conlumn c1

    Merged schema: c0, c1

  3. Removing f0

    Which is the "correct" merged schema now?

    a. c0, c1
    b. c1

    I tend to use (a), because removing existing columns can be dangerous, and may confuse down stream systems. But currently Spark SQL uses (b). Also, we need to take metastore schema into account for Parquet relations converted from metastore Parquet tables.

I think this issue is too complicated to be fixed in this PR. I agree with you that we should keep this PR simple and just re-read all the footers for now. It's already strictly better than the current implementation, not mention that schema merging has been significantly accelerated by #7396.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38188 has finished for PR 7154 at commit a52b6d1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38212 has finished for PR 7154 at commit c8fdfb7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 23, 2015

Test build #38229 has finished for PR 7154 at commit ae0ec64.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 24, 2015

ping @liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert these two indentation changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

@liancheng
Copy link
Contributor

LGTM except for a minor styling issue. Will merge this to master once it's fixed. Thanks for working on this!

@asfgit asfgit closed this in 6a7e537 Jul 24, 2015
@liancheng
Copy link
Contributor

Merged to master.

@viirya
Copy link
Member Author

viirya commented Jul 24, 2015

Ok. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38345 has finished for PR 7154 at commit 92e9347.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya deleted the cached_footer_parquet_relation branch December 27, 2023 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants