Skip to content

Conversation

@budde
Copy link

@budde budde commented Mar 26, 2015

Opening to replace #5188.

When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an "ALTER TABLE... ADD PARTITION..." statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The mergeMetastoreParquetSchema() method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore.

This pull requests alters the behavior of mergeMetastoreParquetSchema() by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29254 has started for PR 5214 at commit 9041bfa.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29254 has finished for PR 5214 at commit 9041bfa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29254/
Test FAILed.

@budde
Copy link
Author

budde commented Mar 26, 2015

Taking a look at why these tests failed.

@budde
Copy link
Author

budde commented Mar 26, 2015

I must've accidentally run the tests on an old build artifact before opening this PR. It turns out that tests included #5141 expect failure in scenarios now permitted by this PR, while the tests originally included in this PR also expect failure in scenarios now permitted by #5141. I've cleared this up and the tests should pass now.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29257 has started for PR 5214 at commit a52d378.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29257 has finished for PR 5214 at commit a52d378.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29257/
Test PASSed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid diff and ++ are not OK here. For example, if the metastore schema has fields <a, b, c>, the Parquet schema has fields <a, c>, then the result schema would be <a, c, b>.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected order of fields in a schema? Is is lexicographic? Should we maintain the order of the metastore schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not lexicographic, the order of fields in the result schema should be the same as the metastore schema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we deal with potential ambiguities that may be introduced due to #5141? For instance, say we are merging the following schemas:

Metastores schema Parquet schema
Foo Foo
Bar Bar
Baz Bop
Bat Bat

The following options come to mind:

  • Attempt to merge the orderings and accept any possibility when there are ambiguities (e.g. both Foo Bar Baz Bop Bat and Foo Bar Bop Baz Bat are acceptable).
  • The fields defined in the metastore schema always begin in order, followed by any additional fields defined in the Parquet schema (e.g. Foo Bar Baz Bat Bop is the only accepted ordering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the metastore schema is available, we are actually converting a metastore Parquet table into ParquetRelation2. Thus, the final reconciled schema should have exactly the same fields as the metastore schema, and simply drop any fields only appear in the Parquet data file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Based on the change made in #5141, it looks like the schema returned by mergeMissingNullableFields() will still contain any additional fields defined in parquetSchema (lines 766-767). How would you feel about simply removing the additional parquetSchema fields in the mergeMissingNullableFields() method?

Execution would look something like this:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I consider it, I'm not convinced that having the mergeNullableFields() method return the fields in non-metastore order is a problem here. Lines 766-767 of mergeMetastoreParquetSchema() should handle putting them in the proper order.

Removing the additional fields is still an option to consider, however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, you're right :) Totally forgot that mergeMetastoreParquetSchema already handles field reordering here. And all additional Parquet fields are removed via this zip call.

@liancheng
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented Mar 28, 2015

Test build #29333 has started for PR 5214 at commit a52d378.

  • This patch merges cleanly.

asfgit pushed a commit that referenced this pull request Mar 28, 2015
…a Parquet schema

Opening to replace #5188.

When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore.

In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an *"ALTER TABLE... ADD PARTITION..."* statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema.

In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The **mergeMetastoreParquetSchema()** method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore.

This pull requests alters the behavior of **mergeMetastoreParquetSchema()** by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there.

Author: Adam Budde <[email protected]>

Closes #5214 from budde/nullable-fields and squashes the following commits:

a52d378 [Adam Budde] Refactor ParquetSchemaSuite.scala for cases now permitted by SPARK-6471 and SPARK-6538
9041bfa [Adam Budde] Add missing nullable Metastore fields when merging a Parquet schema

(cherry picked from commit 5909f09)
Signed-off-by: Cheng Lian <[email protected]>
@asfgit asfgit closed this in 5909f09 Mar 28, 2015
@liancheng
Copy link
Contributor

LGTM. Merged to master and 1.3. Thanks for working on this!

@SparkQA
Copy link

SparkQA commented Mar 28, 2015

Test build #29333 has finished for PR 5214 at commit a52d378.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29333/
Test PASSed.

asfgit pushed a commit that referenced this pull request Jun 23, 2015
This PR adds a section about Hive metastore Parquet table conversion. It documents:

1. Schema reconciliation rules introduced in #5214 (see [this comment] [1] in #5188)
2. Metadata refreshing requirement introduced in #5339

[1]: #5188 (comment)

Author: Cheng Lian <[email protected]>

Closes #5348 from liancheng/sql-doc-parquet-conversion and squashes the following commits:

42ae0d0 [Cheng Lian] Adds Python `refreshTable` snippet
4c9847d [Cheng Lian] Resorts to SQL for Python metadata refreshing snippet
756e660 [Cheng Lian] Adds Python snippet for metadata refreshing
50675db [Cheng Lian] Addes Hive metastore Parquet table conversion section
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants