Skip to content

Conversation

@budde
Copy link

@budde budde commented Feb 15, 2017

Summary of changes

Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties.

  • Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
  • Add schemaPreservesCase field to CatalogTable (set to false when schema can't
    successfully be read from Hive table props)
  • Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
    false, depending on spark.sql.hive.caseSensitiveInferenceMode
  • Add alterTableSchema() method to the ExternalCatalog interface
  • Add HiveSchemaInferenceSuite tests
  • Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
    HiveMetastoreCatalog.mergeWithMetastoreSchema
  • Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite

JIRA for this change

How was this patch tested?

The tests in HiveSchemaInferenceSuite should verify that schema inference is working as expected. ExternalCatalogSuite has also been extended to cover the new alterTableSchema() API.

@budde
Copy link
Author

budde commented Feb 15, 2017

Re-pinging participants from #16797: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan. Sorry for the noise.

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72959 has finished for PR 16944 at commit 0bca163.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@budde
Copy link
Author

budde commented Feb 16, 2017

Looks like I missed a Catalyst test. Updating the PR.

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72978 has finished for PR 16944 at commit 8ac3b04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is INFER_AND_SAVE a good default value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was proposed in #16797 but I'd like to open this for discussion.

  • INFER_ONLY would mimic the pre-2.1.0 behavior.
  • INFER_AND_SAVE would attempt to prevent future inferences but may fail if the Hive client doesn't have write permissions on the metastore.
  • NEVER_INFER is the current behavior in 2.1.0 which breaks support with the tables affected by SPARK-19611. Users may wish to enable this mode for tables without the table properties schema that they know are case-insensitive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFER_ONLY seems better to me as a default since it avoid throwing exceptions. To avoid silent performance degradation in this case, perhaps we can log a warning pointing to this config? Alternatively, I could see INFER_AND_SAVE as the default if we caught the write permission errors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the code to catch and log any nonfatal exception when performing the alterTable() to save the table schema when INFER_AND_SAVE is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once catalog table info is altered, shall we use the updated catalog table?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only be appropriate if the mode is INFER_AND_SAVE and the call to alterTable() succeeds, right? I'll see if I can refactor this method to do this without making it too convoluted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log info that we are going to infer schema (and save it to metastore)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add an info log here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indent style.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix both of these

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment has no Spark-specific table properties set accurate? As the properties is actually given by passing in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote the method to take arbitrary properties but for the purposes of this test only an empty map is supplied. I'll make the comment more applicable to the method though and describe the usage of it elsewhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if catalog table is not updated when schemaInferenceMode is INFER_ONLY.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if catalog table is actually updated when schemaInferenceMode is INFER_AND_SAVE.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change both of these tests to check the catalog table as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFER_ONLY seems better to me as a default since it avoid throwing exceptions. To avoid silent performance degradation in this case, perhaps we can log a warning pointing to this config? Alternatively, I could see INFER_AND_SAVE as the default if we caught the write permission errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since table props is an implementation detail, consider naming this schemaPreservesCase.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there always some schema stored for a table in the metastore? Consider including something about case-sensitivity in the conf name to distinguish it from general schema inference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we merge in the other cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this from how the schema was inferred in HiveMetastoreCatalog prior to 2.1.0. Only ParquetFileFormat has a merge method.

@budde
Copy link
Author

budde commented Feb 16, 2017

I've updated the PR based on feedback received. Changes from previous commit:

  • Fixed a couple indent issues
  • Clarify some HiveSchemaInferenceSuite comments and general cleanup
  • Add CatalogTable checks and NEVER_INFER test to SchemaInferenceSuite
  • Added additional info/error logging to HiveMetastoreCatalog
  • Catch nonfatal exceptions from alterTable() call in HiveMetastoreCatalog
  • Change param name to spark.sql.hive.caseSensitiveInferenceMode
  • Rename CatalogTable.schemaFromTableProps to CatalogTable.schemaPreservesCase
  • Introduce HiveCaseSensitiveInferenceMode enumeration type to get rid of "magic strings" used for inference modes
  • Use updated CatalogTable record in LogicalRelation if INFER_AND_SAVE used

Copy link
Author

@budde budde Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a more appropriate place I can put this Enumeration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can follow PARQUET_COMPRESSION and write the string literal directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to avoid using string literals. If we want to change the possible values for this param we would need to find each and every place the literal value is used and update it. I think this is too flaky and runs the risk of introducing bugs that will only be apparent at runtime. Expressing this as an enumeration gives us some level of type safety and at the very least will cause a compiler error if the possible values are changed and comparisons elsewhere in the code aren't updated.

I'm willing to remove the enumeration if it isn't consistent with Spark code practices but at the very least the possible values should be expressed as constants rather than literals.

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #73014 has finished for PR 16944 at commit ed6ea2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@budde
Copy link
Author

budde commented Feb 17, 2017

Pinging @viirya and @ericl to take a look at the updates per their feedback

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a check for originalTable like below to see if its schemaPreservesCase is false and there is no DATASOURCE_SCHEMA_NUMPARTS property?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I'll add a check for schemaPreservesCase.

We'll have to use the underlying HiveClient to obtain the raw table in order to check the presence of DATASOURCE_SCHEMA_NUMPARTS instead of originalTable directly since HiveExternalCatalog filters out any property starting with SPARK_SQL_PREFIX.

I'm thinking of just adding these checks to the setupCaseSensitiveTable() method since we're essentially just asserting that our initial conditions are what we expect (table returned by catalog has schemaPreservesCase=false and the underlying table contains no Spark properties).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks now happen for all tables via the assert() statements at the end of the setupCaseSensitiveTable() function above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add queries that break? We can intercept their exceptions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #16797 this issue actually won't cause exceptions, at least for Parquet data. The queries will simply return 0 results due to ParquetReadSupport using case-sensitive field resolution. If enabled, any pushed-down filter containing a case-sensitive field will also return 0 results since the lowercase filter field name won't match the case-sensitive Parquet column name.

I'll put some thought towards whether this test can be made more robust in other ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can test if we get 0 rows back?

@SparkQA
Copy link

SparkQA commented Feb 19, 2017

Test build #73123 has finished for PR 16944 at commit 980311f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 19, 2017

Test build #73131 has finished for PR 16944 at commit 15c25e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@budde
Copy link
Author

budde commented Feb 19, 2017

@viirya I've updated the PR to include the initial catalog table checks you've suggested in the setupCaseSensitiveTable() helper method.

@budde
Copy link
Author

budde commented Feb 21, 2017

Pinging participants from #16797 once more to get any feedback on the new proposal: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we create a special table property? object CatalogTable defines some specify properties for view and we can follow it. If we keeps adding more parameters, we may blow up the CatalogTable one day...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and when we fix the schema and try to write it back, remember to remove this property first.

Copy link
Author

@budde budde Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered taking this approach but I think adding this as a parameter to CatalogTable itself is more explicit and less flaky. I share your concern that adding more and more parameters to CatalogTable could make this less usable, especially since params like schemaPreservesCase really only matter when dealing with Hive tables.

However, I don't think dumping more and more parameters into properties is a great solution either. As you've pointed out, we would need to filter out the properties only used internally by Spark before writing them to the catalog. HiveExternalCatalog already filters out Spark SQL-specific properties from the CatalogTable returned by HiveClient. Adding additional internal properties would put us in a place where properties contains:

  • Actual properties key/value pairs returned from the Hive metastore table which should be preserved by HiveExternalCatalog and written back when altering the table.
  • Spark SQL-specific properties that are stored in the Hive metastore table but filtered out by HiveExternalCatalog when used by Spark internally. These properties must be restored before writing back.
  • Spark SQL internal-only properties that are added after reading the table from the metastore and must be removed before writing it.

Which isn't even to mention that we'll have to be serializing/deserializing this value to and from a (String, String) pair just to pass information between HiveExternalCatalog and HiveMetastoreCatalog.

I think that if CatalogTable ends up with too many datasource-specific internal parameters then maybe it makes more sense to introduce a new Map element, e.g. internalProperties, so these don't get mixed in with the table properties.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not only about case-preserving, maybe we should leave it unchanged

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we are discarding the schema obtained from the table properties and explicitly falling back to using the case-insenstive schema obtained from the metastore. schemaPreservesCase needs to be set to false here for the same reason it does at line 702.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 9, 2017

Hi, @budde and @cloud-fan .

I met the following situation with Apache master after this commit. Could you check the following case? Previously, Apache Spark shows the correct result. This is PARQUET format, but maybe I guess it's be the same with ORC.

sql("CREATE TABLE t1(a string, b string) PARTITIONED BY (day string, hour string) STORED AS PARQUET").show

sql("INSERT INTO TABLE t1 PARTITION (day = '1', hour = '01' ) VALUES (100, 200)").show

sql("SELECT a, b FROM t1").show
+---+---+
|  a|  b|
+---+---+
|100|200|
+---+---+
hive> ALTER TABLE t1 ADD COLUMNS (dummy string);
sql("SELECT a, b FROM t1").show
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. partition keys can not be changed.
...
+---+----+
|  a|   b|
+---+----+
|100|null|
+---+----+

@budde
Copy link
Author

budde commented Mar 9, 2017

@dongjoon-hyun Hmm, we should be catching and logging this in this catch block. How exactly are you running this? Are you just using spark-shell?

Can you also verify that no exception is thrown when the spark.sql.hive.caseSensitiveInferenceMode SQL param is set to INFER_ONLY?

@dongjoon-hyun
Copy link
Member

Ur, I've got the following.

scala> sql("set spark.sql.hive.caseSensitiveInferenceMode=INFER_ONLY").show
+--------------------+----------+
|                 key|     value|
+--------------------+----------+
|spark.sql.hive.ca...|INFER_ONLY|
+--------------------+----------+

scala> sql("SELECT a, b FROM t1").show
+---+----+
|  a|   b|
+---+----+
|100|null|
+---+----+

@dongjoon-hyun
Copy link
Member

Maybe, t1 is already corrupted. Let me try a new one with that option.

@dongjoon-hyun
Copy link
Member

After hive alteration, I run spark-shell and set the following immediately.
But, it's already broken. When do you save the new inferred schema?

scala> sql("set spark.sql.hive.caseSensitiveInferenceMode=INFER_ONLY").show
+--------------------+----------+
|                 key|     value|
+--------------------+----------+
|spark.sql.hive.ca...|INFER_ONLY|
+--------------------+----------+


scala> sql("SELECT a, b FROM t3").show
+---+----+
|  a|   b|
+---+----+
|100|null|
+---+----+

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 9, 2017

Should the option be set before CREATE TABLE? If then, it seems that we cannot prevent the corruption of the existing parquet tables.

@dongjoon-hyun
Copy link
Member

I found that NEVER_INFER work.

scala> sql("set spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER").show
+--------------------+-----------+
|                 key|      value|
+--------------------+-----------+
|spark.sql.hive.ca...|NEVER_INFER|
+--------------------+-----------+

scala> sql("SELECT a, b FROM t4").show
+---+---+
|  a|  b|
+---+---+
|100|200|
+---+---+

@budde
Copy link
Author

budde commented Mar 10, 2017

I think I'm a little unclear still on what exact components you are using-- you're using Spark SQL via spark-shell to create the table, then using Hive to alter it, then querying the table again via spark-shell? Is this an external metastore or one managed by Spark locally?

If the table is created with Spark, then Spark should be storing the schema under the table properties at that point. This was the behavior prior to this change as well. If the table schema is changed by another application that does not alter the schema in the table properties then Spark SQL the behavior should be the same as it was in 2.1.0.

The fact that you got the exception at all though seems to indicate that the schema wasn't properly read from the table properties or that some state wasn't managed properly. I'm going to try to recreate this locally to figure out what is happening here.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 10, 2017

Yes. It's Mac environment with spark-shell and hive 1.2.1 fully locally. You can try that in your mac. For the procedure, please see the first report. I created and populated tables from Spark and only used hive for ALTER TABLE.

@dongjoon-hyun
Copy link
Member

Oh, whenever I starts spark-shell, I should do the following, or may have that in Spark configuration. Otherwise, it shows wrong result.

sql("set spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER").show

It looks like NEVER_INFER is the only safe default value for this regression.

@budde
Copy link
Author

budde commented Mar 10, 2017

@dongjoon-hyun Are you using Apache Hive itself to run the ALTER TABLE... statement or are you using a local Spark JDBC server and beeline?

EDIT: I was able to recreate this behavior by pointing Hive 1.2.1 to the Derby metastore_db created by Spark.

@dongjoon-hyun
Copy link
Member

Yep. right. I should have describe that more clearly at the initial report...

@budde
Copy link
Author

budde commented Mar 10, 2017

@dongjoon-hyun This problem is happening because HiveMetastoreCatalog.mergeWithMetastoreSchema() doesn't maintain the StructType field order returned by the metastore. I overlooked that the ordering of the schema must be maintained. I'll open a new PR to correct this and add tests where appropriate.

@dongjoon-hyun
Copy link
Member

Thank you for quick investigation. Yep. please go ahead!

BTW, can we hold on backporting (#17229) for a while before resolving all issues?

budde pushed a commit to budde/spark that referenced this pull request Mar 10, 2017
…ed schema

The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in apache#16944 may
not preserve the same field order as the metastore schema in some cases, which can cause
queries to fail. This change ensures that the metastore field order is preserved.

A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.```
The particular failure usecase from apache#16944 was tested manually as well.
@budde
Copy link
Author

budde commented Mar 10, 2017

@dongjoon-hyun This behavior should be fixed by #17249. I'll amend this change to #17229 as well.

asfgit pushed a commit that referenced this pull request Mar 10, 2017
…ed schema

## What changes were proposed in this pull request?

The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in #16944 may
not preserve the same field order as the metastore schema in some cases, which can cause
queries to fail. This change ensures that the metastore field order is preserved.

## How was this patch tested?

A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.```
The particular failure usecase from #16944 was tested manually as well.

Author: Budde <[email protected]>

Closes #17249 from budde/PreserveMetastoreFieldOrder.
@dongjoon-hyun
Copy link
Member

Hi, @budde and @cloud-fan .
Could you check the following Parquet Hive table issues?

It seems to erase bucketing information from Metastore in branch-2.2.
So far, Spark master branch looks okay.

ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 31, 2017
…eCatalog.convertToLogicalRelation

## What changes were proposed in this pull request?

We made a mistake in apache#16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`.

This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#19615 from cloud-fan/infer.
asfgit pushed a commit that referenced this pull request Oct 31, 2017
…eCatalog.convertToLogicalRelation

We made a mistake in #16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`.

This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake.

N/A

Author: Wenchen Fan <[email protected]>

Closes #19615 from cloud-fan/infer.

(cherry picked from commit 4d9ebf3)
Signed-off-by: Wenchen Fan <[email protected]>
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…eCatalog.convertToLogicalRelation

We made a mistake in apache#16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`.

This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake.

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#19615 from cloud-fan/infer.

(cherry picked from commit 4d9ebf3)
Signed-off-by: Wenchen Fan <[email protected]>
.inferSchema(
sparkSession,
options,
fileIndex.listFiles(Nil).flatMap(_.files))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@budde @cloud-fan just wondering if there was a reason we didn't provide a partition filter here instead of Nil? I'm seeing a case where a query, with a partition predicate specified, is hitting this piece of code and all partitions are being listed unnecessarily. I think it can be solved by setting the inference mode to NEVER_INFER but not sure if that is ideal or can have any side effects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema inference needs to look at the entire data set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand on this, let's say you are dealing with a dataset that has an optional field in its schema. If none of the data files for the partition(s) you filter by contain this field then it will never be returned when using schema inference. This may not matter in your particular usecase but IMO inspecting all of the files when inferring the schema is the safer approach in the general case. I believe you can configure a threshold for the number of data files at which the schema inference step will be distributed across the cluster and performed as a map reduce job vs. running on the driver node.

If the full schema is already known beforehand and you're not using any sort of metastore to track table/schema state, using NEVER_INFER in conjunction with manually providing the schema as a read option might be a good approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants