Skip to content

Conversation

@liancheng
Copy link
Contributor

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path. Existing data sources like JSON and Parquet can be simplified with this work.

New features provided

  1. Hive compatible partition discovery

    This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

  2. Generalized partition pruning optimization

    Now partition pruning is handled during physical planning phase. Specific data sources don't need to worry about this harness anymore.

    (This also implies that we can remove CatalystScan after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

  3. Insertion with dynamic partitions

    When inserting data to a FSBasedRelation, data can be partitioned dynamically by specified partition columns.

New structures provided

Developer API

  1. FSBasedRelation

    Base abstract class for file system based data sources.

  2. OutputWriter

    Base abstract class for output row writers, responsible for writing a single row object.

  3. FSBasedRelationProvider

    A new relation provider for FSBasedRelation subclasses. Note that data sources extending FSBasedRelation don't need to extend RelationProvider and SchemaRelationProvider.

User API

New overloaded versions of

  1. DataFrame.save()
  2. DataFrame.saveAsTable()
  3. SQLContext.load()

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

Spark SQL query planning

  1. InsertIntoFSBasedRelation

    Used to implement write path for FSBasedRelations.

  2. New rules for FSBasedRelation in DataSourceStrategy

    These are added to hook FSBasedRelation into physical query plan in read path, and perform partition pruning.

TODO

  • Use scratch directories when overwriting a table with data selected from itself.

    Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

  • When inserting with dynamic partition columns, use external sorter to group the data first.

    This ensures that we only need to open a single OutputWriter at a time. For data sources like Parquet, OutputWriters can be quite memory consuming. One issue is that, this approach breaks the row distribution in the original DataFrame. However, we did't promise to preserve data distribution when writing a DataFrame.

  • More tests. Specifically, test cases for

    • Self-join
    • Loading partitioned relations with a subset of partition columns stored in data files.
    • SQLContext.load() with user defined dynamic partition columns.

Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30342 has started for PR 5526 at commit 9b58be6.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30344 has started for PR 5526 at commit f973010.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that, to be truly Java API friendly, we also need to add another overridden createRelation method where parameters is passed in as a java.util.Map[String, String].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me Scala users can use Java maps pretty easily -- but that's maybe a broader debate.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30347 has started for PR 5526 at commit e209ab0.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30347 has finished for PR 5526 at commit e209ab0.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait PartitionedSchemaRelationProvider
    • trait OutputWriter
    • trait FSBasedRelation extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30347/
Test FAILed.

@liancheng liancheng force-pushed the partitioning-support branch from e209ab0 to d432248 Compare April 15, 2015 14:38
@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30348 has started for PR 5526 at commit d432248.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I decided to keep this method. For data sources like Hive, Parquet, and ORC, although the driver side preparation work done before issuing the write job can be somehow converted to executor side, we still need another similar hook to do the work. Constructor of OutputWriter instances is not a proper place to do the executor side preparation. Because in case of dynamic partitioning, a single task may create multiple OutputWriter, while the preparation should be done only once. Another reason is that, for traditional Hadoop users, driver side setup code can be pretty conventional and familiar.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30349 has started for PR 5526 at commit b46ee49.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30342 has finished for PR 5526 at commit 9b58be6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait PartitionedSchemaRelationProvider
    • trait OutputWriter
    • trait FSBasedPrunedFilteredScan extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30342/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30344 timed out for PR 5526 at commit f973010 after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30344/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30348 has finished for PR 5526 at commit d432248.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait PartitionedSchemaRelationProvider
    • trait OutputWriter
    • trait FSBasedRelation extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30348/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30349 has finished for PR 5526 at commit b46ee49.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait PartitionedSchemaRelationProvider
    • trait OutputWriter
    • trait FSBasedRelation extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30349/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean now BaseRelation needs to be serializable? I don't think this was the case before, was it?

I think it'd make more sense to have OutputWriter to have a zero-arg ctor, and the relation simply returns the class or the class name, and then we create the OutputWriter on each executor using reflection, and then call init(path: String, schema: StructType, options: Map[...]) on those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did have to be serializable before as it is hooked into a query plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was never part of the contract, was there? Also up until this point, it could've been a transient variable, because we only need it on the driver side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue here is about passing driver side Hadoop configuration to OutputWriters on executor side. Users may set properties to Hadoop configurations on driver side (e.g. mapreduce.fileoutputcommitter.marksuccessfuljobs), and we should inherit these settings on executor side when writing data. zero-arg constructor plus init(...) is a good way to avoid forcing BaseRelation to be serializable, but I guess we have to put Configuration as an argument of OutputWriter.init(...). This makes the data sources API coupled with Hadoop API via Configuration, but I guess this should be more acceptable comparing to forcing BaseRelation subclasses to be serializable?

@SparkQA
Copy link

SparkQA commented Apr 23, 2015

Test build #30846 has started for PR 5526 at commit 4e93e9b.

@SparkQA
Copy link

SparkQA commented Apr 23, 2015

Test build #30846 has finished for PR 5526 at commit 4e93e9b.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait FSBasedRelationProvider
    • abstract class OutputWriter
    • abstract class FSBasedRelation extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30846/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 23, 2015

Test build #30850 has started for PR 5526 at commit b63f813.

@liancheng
Copy link
Contributor Author

@marmbrus @yhuai @rxin Previous comments are addressed, Also added tests (ignored for now) in FSBasedRelationSuite. Going to implement all the interface.

@SparkQA
Copy link

SparkQA commented Apr 23, 2015

Test build #30850 has finished for PR 5526 at commit b63f813.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait FSBasedRelationProvider
    • abstract class OutputWriter
    • abstract class FSBasedRelation extends BaseRelation
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 12, 2015

Test build #32489 has started for PR 5526 at commit 5351a1b.

@liancheng
Copy link
Contributor Author

@marmbrus Per our offline discussion, since all review issues are addressed now, I'm going to merge this once it passes Jenkins. Otherwise I'm afraid we'll just have to keep rebasing this giant :)

@liancheng liancheng changed the title [SPARK-5182] [SQL] Partitioning support for the data sources API [SPARK-3928] [SPARK-5182] [SQL] Partitioning support for the data sources API May 12, 2015
@SparkQA
Copy link

SparkQA commented May 12, 2015

Test build #32489 has finished for PR 5526 at commit 5351a1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32489/
Test FAILed.

@liancheng
Copy link
Contributor Author

Last build failure was caused by MLlib.

@liancheng
Copy link
Contributor Author

retest this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 12, 2015

Test build #32508 has started for PR 5526 at commit 5351a1b.

@SparkQA
Copy link

SparkQA commented May 12, 2015

Test build #32508 has finished for PR 5526 at commit 5351a1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s"FileOutputCommitter or its subclass is expected, but got a $
    • trait FSBasedRelationProvider
    • abstract class OutputWriter

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32508/
Test PASSed.

@liancheng
Copy link
Contributor Author

Merging to master and branch-1.4.

asfgit pushed a commit that referenced this pull request May 12, 2015
…rces API

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path.  Existing data sources like JSON and Parquet can be simplified with this work.

## New features provided

1. Hive compatible partition discovery

   This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

1. Generalized partition pruning optimization

   Now partition pruning is handled during physical planning phase.  Specific data sources don't need to worry about this harness anymore.

   (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

1. Insertion with dynamic partitions

   When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.

## New structures provided

### Developer API

1. `FSBasedRelation`

   Base abstract class for file system based data sources.

1. `OutputWriter`

   Base abstract class for output row writers, responsible for writing a single row object.

1. `FSBasedRelationProvider`

   A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.

### User API

New overloaded versions of

1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

### Spark SQL query planning

1. `InsertIntoFSBasedRelation`

   Used to implement write path for `FSBasedRelation`s.

1. New rules for `FSBasedRelation` in `DataSourceStrategy`

   These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.

## TODO

- [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single `OutputWriter` at a time.  For data sources like Parquet, `OutputWriter`s can be quite memory consuming.  One issue is that, this approach breaks the row distribution in the original DataFrame.  However, we did't promise to preserve data distribution when writing a DataFrame.

- [x] More tests.  Specifically, test cases for

      - [x] Self-join
      - [x] Loading partitioned relations with a subset of partition columns stored in data files.
      - [x] `SQLContext.load()` with user defined dynamic partition columns.

## Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

Author: Cheng Lian <[email protected]>

Closes #5526 from liancheng/partitioning-support and squashes the following commits:

5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos.  Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support

(cherry picked from commit 0595b6d)
Signed-off-by: Cheng Lian <[email protected]>
@asfgit asfgit closed this in 0595b6d May 12, 2015
asfgit pushed a commit that referenced this pull request May 13, 2015
#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.

cc liancheng

Author: zsxwing <[email protected]>

Closes #6095 from zsxwing/hotfix and squashes the following commits:

b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions

(cherry picked from commit 247b703)
Signed-off-by: Cheng Lian <[email protected]>
asfgit pushed a commit that referenced this pull request May 13, 2015
#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.

cc liancheng

Author: zsxwing <[email protected]>

Closes #6095 from zsxwing/hotfix and squashes the following commits:

b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this println

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, removed in #6123

jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…rces API

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path.  Existing data sources like JSON and Parquet can be simplified with this work.

## New features provided

1. Hive compatible partition discovery

   This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

1. Generalized partition pruning optimization

   Now partition pruning is handled during physical planning phase.  Specific data sources don't need to worry about this harness anymore.

   (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

1. Insertion with dynamic partitions

   When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.

## New structures provided

### Developer API

1. `FSBasedRelation`

   Base abstract class for file system based data sources.

1. `OutputWriter`

   Base abstract class for output row writers, responsible for writing a single row object.

1. `FSBasedRelationProvider`

   A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.

### User API

New overloaded versions of

1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

### Spark SQL query planning

1. `InsertIntoFSBasedRelation`

   Used to implement write path for `FSBasedRelation`s.

1. New rules for `FSBasedRelation` in `DataSourceStrategy`

   These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.

## TODO

- [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single `OutputWriter` at a time.  For data sources like Parquet, `OutputWriter`s can be quite memory consuming.  One issue is that, this approach breaks the row distribution in the original DataFrame.  However, we did't promise to preserve data distribution when writing a DataFrame.

- [x] More tests.  Specifically, test cases for

      - [x] Self-join
      - [x] Loading partitioned relations with a subset of partition columns stored in data files.
      - [x] `SQLContext.load()` with user defined dynamic partition columns.

## Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

Author: Cheng Lian <[email protected]>

Closes apache#5526 from liancheng/partitioning-support and squashes the following commits:

5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos.  Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
apache#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.

cc liancheng

Author: zsxwing <[email protected]>

Closes apache#6095 from zsxwing/hotfix and squashes the following commits:

b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…rces API

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path.  Existing data sources like JSON and Parquet can be simplified with this work.

## New features provided

1. Hive compatible partition discovery

   This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

1. Generalized partition pruning optimization

   Now partition pruning is handled during physical planning phase.  Specific data sources don't need to worry about this harness anymore.

   (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

1. Insertion with dynamic partitions

   When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.

## New structures provided

### Developer API

1. `FSBasedRelation`

   Base abstract class for file system based data sources.

1. `OutputWriter`

   Base abstract class for output row writers, responsible for writing a single row object.

1. `FSBasedRelationProvider`

   A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.

### User API

New overloaded versions of

1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

### Spark SQL query planning

1. `InsertIntoFSBasedRelation`

   Used to implement write path for `FSBasedRelation`s.

1. New rules for `FSBasedRelation` in `DataSourceStrategy`

   These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.

## TODO

- [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single `OutputWriter` at a time.  For data sources like Parquet, `OutputWriter`s can be quite memory consuming.  One issue is that, this approach breaks the row distribution in the original DataFrame.  However, we did't promise to preserve data distribution when writing a DataFrame.

- [x] More tests.  Specifically, test cases for

      - [x] Self-join
      - [x] Loading partitioned relations with a subset of partition columns stored in data files.
      - [x] `SQLContext.load()` with user defined dynamic partition columns.

## Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

Author: Cheng Lian <[email protected]>

Closes apache#5526 from liancheng/partitioning-support and squashes the following commits:

5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos.  Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
apache#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.

cc liancheng

Author: zsxwing <[email protected]>

Closes apache#6095 from zsxwing/hotfix and squashes the following commits:

b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…rces API

This PR adds partitioning support for the external data sources API. It aims to simplify development of file system based data sources, and provide first class partitioning support for both read path and write path.  Existing data sources like JSON and Parquet can be simplified with this work.

## New features provided

1. Hive compatible partition discovery

   This actually generalizes the partition discovery strategy used in Parquet data source in Spark 1.3.0.

1. Generalized partition pruning optimization

   Now partition pruning is handled during physical planning phase.  Specific data sources don't need to worry about this harness anymore.

   (This also implies that we can remove `CatalystScan` after migrating the Parquet data source, since now we don't need to pass Catalyst expressions to data source implementations.)

1. Insertion with dynamic partitions

   When inserting data to a `FSBasedRelation`, data can be partitioned dynamically by specified partition columns.

## New structures provided

### Developer API

1. `FSBasedRelation`

   Base abstract class for file system based data sources.

1. `OutputWriter`

   Base abstract class for output row writers, responsible for writing a single row object.

1. `FSBasedRelationProvider`

   A new relation provider for `FSBasedRelation` subclasses. Note that data sources extending `FSBasedRelation` don't need to extend `RelationProvider` and `SchemaRelationProvider`.

### User API

New overloaded versions of

1. `DataFrame.save()`
1. `DataFrame.saveAsTable()`
1. `SQLContext.load()`

are provided to allow users to save/load DataFrames with user defined dynamic partition columns.

### Spark SQL query planning

1. `InsertIntoFSBasedRelation`

   Used to implement write path for `FSBasedRelation`s.

1. New rules for `FSBasedRelation` in `DataSourceStrategy`

   These are added to hook `FSBasedRelation` into physical query plan in read path, and perform partition pruning.

## TODO

- [ ] Use scratch directories when overwriting a table with data selected from itself.

      Currently, this is not supported, because the table been overwritten is always deleted before writing any data to it.

- [ ] When inserting with dynamic partition columns, use external sorter to group the data first.

      This ensures that we only need to open a single `OutputWriter` at a time.  For data sources like Parquet, `OutputWriter`s can be quite memory consuming.  One issue is that, this approach breaks the row distribution in the original DataFrame.  However, we did't promise to preserve data distribution when writing a DataFrame.

- [x] More tests.  Specifically, test cases for

      - [x] Self-join
      - [x] Loading partitioned relations with a subset of partition columns stored in data files.
      - [x] `SQLContext.load()` with user defined dynamic partition columns.

## Parquet data source migration

Parquet data source migration is covered in PR liancheng#6, which is against this PR branch and for preview only. A formal PR need to be made after this one is merged.

Author: Cheng Lian <[email protected]>

Closes apache#5526 from liancheng/partitioning-support and squashes the following commits:

5351a1b [Cheng Lian] Fixes compilation error introduced while rebasing
1f9b1a5 [Cheng Lian] Tweaks data schema passed to FSBasedRelations
43ba50e [Cheng Lian] Avoids serializing generated projection code
edf49e7 [Cheng Lian] Removed commented stale code block
348a922 [Cheng Lian] Adds projection in FSBasedRelation.buildScan(requiredColumns, inputPaths)
ad4d4de [Cheng Lian] Enables HDFS style globbing
8d12e69 [Cheng Lian] Fixes compilation error
c71ac6c [Cheng Lian] Addresses comments from @marmbrus
7552168 [Cheng Lian] Fixes typo in MimaExclude.scala
0349e09 [Cheng Lian] Fixes compilation error introduced while rebasing
52b0c9b [Cheng Lian] Adjusts project/MimaExclude.scala
c466de6 [Cheng Lian] Addresses comments
bc3f9b4 [Cheng Lian] Uses projection to separate partition columns and data columns while inserting rows
795920a [Cheng Lian] Fixes compilation error after rebasing
0b8cd70 [Cheng Lian] Adds Scala/Catalyst row conversion when writing non-partitioned tables
fa543f3 [Cheng Lian] Addresses comments
5849dd0 [Cheng Lian] Fixes doc typos.  Fixes partition discovery refresh.
51be443 [Cheng Lian] Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
c4ed4fe [Cheng Lian] Bug fixes and a new test suite
a29e663 [Cheng Lian] Bug fix: should only pass actuall data files to FSBaseRelation.buildScan
5f423d3 [Cheng Lian] Bug fixes. Lets data source to customize OutputCommitter rather than OutputFormat
54c3d7b [Cheng Lian] Enforces that FileOutputFormat must be used
be0c268 [Cheng Lian] Uses TaskAttempContext rather than Configuration in OutputWriter.init
0bc6ad1 [Cheng Lian] Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
f320766 [Cheng Lian] Adds prepareForWrite() hook, refactored writer containers
422ff4a [Cheng Lian] Fixes style issue
ce52353 [Cheng Lian] Adds new SQLContext.load() overload with user defined dynamic partition columns
8d2ff71 [Cheng Lian] Merges partition columns when reading partitioned relations
ca1805b [Cheng Lian] Removes duplicated partition discovery code in new Parquet
f18dec2 [Cheng Lian] More strict schema checking
b746ab5 [Cheng Lian] More tests
9b487bf [Cheng Lian] Fixes compilation errors introduced while rebasing
ea6c8dd [Cheng Lian] Removes remote debugging stuff
327bb1d [Cheng Lian] Implements partitioning support for data sources API
3c5073a [Cheng Lian] Fixes SaveModes used in test cases
fb5a607 [Cheng Lian] Fixes compilation error
9d17607 [Cheng Lian] Adds the contract that OutputWriter should have zero-arg constructor
5de194a [Cheng Lian] Forgot Apache licence header
95d0b4d [Cheng Lian] Renames PartitionedSchemaRelationProvider to FSBasedRelationProvider
770b5ba [Cheng Lian] Adds tests for FSBasedRelation
3ba9bbf [Cheng Lian] Adds DataFrame.saveAsTable() overrides which support partitioning
1b8231f [Cheng Lian] Renames FSBasedPrunedFilteredScan to FSBasedRelation
aa8ba9a [Cheng Lian] Javadoc fix
012ed2d [Cheng Lian] Adds PartitioningOptions
7dd8dd5 [Cheng Lian] Adds new interfaces and stub methods for data sources API partitioning support
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
apache#5526 uses `Job.getInstance`, which does not exist in the old Hadoop versions. Just use `new Job` to replace it.

cc liancheng

Author: zsxwing <[email protected]>

Closes apache#6095 from zsxwing/hotfix and squashes the following commits:

b0c2049 [zsxwing] Use the old Job API to support old Hadoop versions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants