Skip to content

Conversation

@dtenedor
Copy link
Contributor

@dtenedor dtenedor commented May 25, 2022

What changes were proposed in this pull request?

Support vectorized Parquet scans when the table schema has associated DEFAULT column values.

Example:

create table t(i int) using parquet;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'

Why are the changes needed?

This change makes it easier to build, query, and maintain tables backed by Parquet data.

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

This PR includes new test coverage.

@dtenedor
Copy link
Contributor Author

Synced latest changes from master, this PR no longer depends on any other unmerged PRs anymore

@HyukjinKwon
Copy link
Member

cc @sadikovi too FYI

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sadikovi
Copy link
Contributor

Also, can we add a test to check that the DEFAULT values work? Thanks.

@dtenedor
Copy link
Contributor Author

dtenedor commented Jun 1, 2022

Also, can we add a test to check that the DEFAULT values work? Thanks.

@sadikovi Sure, this is done in InsertSuite, by adding a new configuration to the case covering Parquet files (previously it only covered the non-vectorized case, but now with Config(None) it also runs the test over the vectorized case as well):

      TestCase(
        dataSource = "parquet",
        Seq(
          Config(
            None),
          Config(
            Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
            insertNullsToStorage = false)))

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for updating the test.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 2, 2022

The test log is a bit messy .. just copying and pasting the error I saw:

2022-06-02T02:22:55.9427269Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- SPARK-38336 INSERT INTO statements with tables with default columns: negative tests *** FAILED *** (13 milliseconds)�[0m�[0m
2022-06-02T02:22:55.9442627Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.apache.spark.sql.AnalysisException: Failed to execute CREATE TABLE command because the destination table column s has a DEFAULT value of badvalue which fails to resolve as a valid expression: [MISSING_COLUMN] Column 'badvalue' does not exist. Did you mean one of the following? []; line 1 pos 0�[0m�[0m
2022-06-02T02:22:55.9443938Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:141)�[0m�[0m
2022-06-02T02:22:55.9445083Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$constantFoldCurrentDefaultsToExistDefaults$1(ResolveDefaultColumnsUtil.scala:96)�[0m�[0m
2022-06-02T02:22:55.9449272Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)�[0m�[0m
2022-06-02T02:22:55.9451745Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)�[0m�[0m
2022-06-02T02:22:55.9452379Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)�[0m�[0m
2022-06-02T02:22:55.9452963Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)�[0m�[0m
2022-06-02T02:22:55.9453505Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.TraversableLike.map(TraversableLike.scala:286)�[0m�[0m
2022-06-02T02:22:55.9454049Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)�[0m�[0m
2022-06-02T02:22:55.9454583Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)�[0m�[0m
2022-06-02T02:22:55.9494729Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.constantFoldCurrentDefaultsToExistDefaults(ResolveDefaultColumnsUtil.scala:94)�[0m�[0m
2022-06-02T02:22:55.9495832Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:153)�[0m�[0m
2022-06-02T02:22:55.9496639Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.datasources.DataSourceAnalysis$$anonfun$apply$1.applyOrElse(DataSourceStrategy.scala:148)�[0m�[0m
2022-06-02T02:22:55.9497537Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)�[0m�[0m
2022-06-02T02:22:55.9498263Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)�[0m�[0m
2022-06-02T02:22:55.9499007Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)�[0m�[0m
2022-06-02T02:22:55.9499830Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)�[0m�[0m
2022-06-02T02:22:55.9500716Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)�[0m�[0m
2022-06-02T02:22:55.9501658Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)�[0m�[0m
2022-06-02T02:22:55.9502569Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)�[0m�[0m
2022-06-02T02:22:55.9503469Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)�[0m�[0m
2022-06-02T02:22:55.9504354Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)�[0m�[0m
2022-06-02T02:22:55.9505333Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)�[0m�[0m
2022-06-02T02:22:55.9506175Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)�[0m�[0m
2022-06-02T02:22:55.9506968Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)�[0m�[0m
2022-06-02T02:22:55.9507748Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)�[0m�[0m
2022-06-02T02:22:55.9508709Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:148)�[0m�[0m
2022-06-02T02:22:55.9509477Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.execution.datasources.DataSourceAnalysis.apply(DataSourceStrategy.scala:64)�[0m�[0m
2022-06-02T02:22:55.9510186Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)�[0m�[0m
2022-06-02T02:22:55.9510811Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)�[0m�[0m
2022-06-02T02:22:55.9511420Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)�[0m�[0m
2022-06-02T02:22:55.9511976Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foldLeft(List.scala:91)�[0m�[0m
2022-06-02T02:22:55.9512576Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)�[0m�[0m
2022-06-02T02:22:55.9513240Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)�[0m�[0m
2022-06-02T02:22:55.9513896Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:431)�[0m�[0m

@dtenedor
Copy link
Contributor Author

dtenedor commented Jun 2, 2022

@HyukjinKwon the CI passes now :)

@dtenedor
Copy link
Contributor Author

dtenedor commented Jun 3, 2022

@gengliangwang @HyukjinKwon @cloud-fan can someone please merge this in (or leave more review comment(s) if desired for another pass)?

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work!

gengliangwang pushed a commit that referenced this pull request Jun 3, 2022
### What changes were proposed in this pull request?

Support vectorized Orc scans when the table schema has associated DEFAULT column values.

(Note, this PR depends on #36672 which adds the same for Parquet files.)

Example:

```
create table t(i int) using orc;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'
```

### Why are the changes needed?

This change makes it easier to build, query, and maintain tables backed by Orc data.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

This PR includes new test coverage.

Closes #36675 from dtenedor/default-orc-vectorized.

Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants