-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-7567] [SQL] Migrating Parquet data source to FSBasedRelation #6090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code doesn't handle file names like parquet-r-00001.gz.parquet.
|
Merged build triggered. |
|
Merged build started. |
|
Test build #32523 has started for PR 6090 at commit |
|
Test build #32523 has finished for PR 6090 at commit
|
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Test build #802 has started for PR 6090 at commit |
|
Test build #802 has finished for PR 6090 at commit
|
|
Merged build triggered. |
|
Merged build started. |
|
Test build #32549 has started for PR 6090 at commit |
|
Test build #32549 has finished for PR 6090 at commit
|
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type does not need to be a FileOutputCommitter, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used FileOutputCommitter here because we need to retrieve the actual path of file being written, which is returned by FileOutputCommitter.getWorkPath. This implies customized output committers must be subclasses of FileOutputCommitter, which was true for DirectParquetOutputCommitter. But this restriction seems too strict. Resorting to OutputCommitter rather than FileOutputCommitter in another PR.
e40bb7b to
a0a3ee9
Compare
|
Merged build triggered. |
|
Merged build started. |
|
Rebased to #6118. |
|
Merged build triggered. |
|
Merged build started. |
|
Test build #32620 has started for PR 6090 at commit |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
Test build #803 has started for PR 6090 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
classOf[OutputCommitter]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this comment is outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. When FileOutputCommitter is used, we still use FileOutputCommitter.getWorkPath() internally inside InsertIntoFSBasedRelation.
ff22b46 to
6063f87
Compare
|
Merged build triggered. |
|
Merged build started. |
|
Test build #32626 has started for PR 6090 at commit |
|
Test build #32620 has finished for PR 6090 at commit
|
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this constructor is not defined in OutputCommitter.
Why do we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, in InsertIntoFSBasedRelation.run, we already set the the output path through FileOutputFormat.setOutputPath(job, qualifiedOutputPath). So, the output path should be set in context. Seems we only need to check if mapred.output.committer.class is set or not. If it is set, we create the output committer based on the specified class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val committerClass = context.getConfiguration.getClass(
"mapred.output.committer.class", null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}.getOrElse {
outputFormatClass.newInstance().getOutputCommitter(context)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if committerClass is based on mapred interface, setupJob will not work because mapred's output committer use mapred JobContext (a subclass of mapreduce's JobContext) and we are using Job in mapreduce package (another subclass of mapreduce's JobContext).
|
I'm going to go ahead and merge this so we can start testing. Yin's concerns about using other output committers can be addressed in a followup (we should consider adding tests that use |
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation (cherry picked from commit 7ff16e8) Signed-off-by: Michael Armbrust <[email protected]>
|
Test build #803 has finished for PR 6090 at commit
|
|
Test build #32626 has finished for PR 6090 at commit
|
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced `FSBasedRelation`. `FSBasedParquetRelation` is created to replace `ParquetRelation2`. Major differences are: 1. Partition discovery code has been factored out to `FSBasedRelation` 1. `AppendingParquetOutputFormat` is not used now. Instead, an anonymous subclass of `ParquetOutputFormat` is used to handle appending and writing dynamic partitions 1. When scanning partitioned tables, `FSBasedParquetRelation.buildScan` only builds an `RDD[Row]` for a single selected partition 1. `FSBasedParquetRelation` doesn't rely on Catalyst expressions for filter push down, thus it doesn't extend `CatalystScan` anymore After migrating `JSONRelation` (which extends `CatalystScan`), we can remove `CatalystScan`. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6090) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#6090 from liancheng/parquet-migration and squashes the following commits: 6063f87 [Cheng Lian] Casts to OutputCommitter rather than FileOutputCommtter bfd1cf0 [Cheng Lian] Fixes compilation error introduced while rebasing f9ea56e [Cheng Lian] Adds ParquetRelation2 related classes to MiMa check whitelist 261d8c1 [Cheng Lian] Minor bug fix and more tests db65660 [Cheng Lian] Migrates Parquet data source to FSBasedRelation
This PR migrates Parquet data source to the newly introduced
FSBasedRelation.FSBasedParquetRelationis created to replaceParquetRelation2. Major differences are:Partition discovery code has been factored out to
FSBasedRelationAppendingParquetOutputFormatis not used now. Instead, an anonymous subclass ofParquetOutputFormatis used to handle appending and writing dynamic partitionsWhen scanning partitioned tables,
FSBasedParquetRelation.buildScanonly builds anRDD[Row]for a single selected partitionFSBasedParquetRelationdoesn't rely on Catalyst expressions for filter push down, thus it doesn't extendCatalystScananymoreAfter migrating
JSONRelation(which extendsCatalystScan), we can removeCatalystScan.