Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 18, 2024

What changes were proposed in this pull request?

We got a customer issue that a MergeInto query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.

The source table of MergeInto uses ScalaUDF. The error happens when Spark invokes the deserializer of input encoder of the ScalaUDF and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule ResolveEncodersInUDF which will be applied at the end of analysis phase.

During rewriting MergeInto to ReplaceData query, Spark creates an Exists subquery and ScalaUDF is part of the plan of the subquery. Note that the ScalaUDF is already resolved by the analyzer.

Then, in ResolveSubquery rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing ScalaUDF is resolved, the rule skips it so ResolveEncodersInUDF won't be applied on it. So the analyzed ReplaceData query contains a ScalaUDF with encoders unresolved that cause the error.

This patch modifies ResolveSubquery so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves ResolveEncodersInUDF rule before rewriting MergeInto to make sure the ScalaUDF in the subquery plan is fully analyzed.

Why are the changes needed?

Fixing production query error.

Does this PR introduce any user-facing change?

Yes, fixing user-facing issue.

How was this patch tested?

Manually test with MergeInto query and add an unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

cc @dongjoon-hyun

@huaxingao
Copy link
Contributor

@yaooqinn Can we include this fix in 3.5.2 release? Thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs).

dongjoon-hyun pushed a commit that referenced this pull request Jul 18, 2024
…ved for MergeInto

### What changes were proposed in this pull request?

We got a customer issue that a `MergeInto` query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

```
Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.
```

The source table of `MergeInto` uses `ScalaUDF`. The error happens when Spark invokes the deserializer of input encoder of the `ScalaUDF` and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` which will be applied at the end of analysis phase.

During rewriting `MergeInto` to `ReplaceData` query, Spark creates an `Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that the `ScalaUDF` is already resolved by the analyzer.

Then, in `ResolveSubquery` rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` won't be applied on it. So the analyzed `ReplaceData` query contains a `ScalaUDF` with encoders unresolved that cause the error.

This patch modifies `ResolveSubquery` so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` to make sure the `ScalaUDF` in the subquery plan is fully analyzed.

### Why are the changes needed?

Fixing production query error.

### Does this PR introduce _any_ user-facing change?

Yes, fixing user-facing issue.

### How was this patch tested?

Manually test with `MergeInto` query and add an unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47406 from viirya/fix_subquery_resolve_3.5.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

Merged to branch-3.5.

@viirya
Copy link
Member Author

viirya commented Jul 18, 2024

Thank you @dongjoon-hyun

@yaooqinn
Copy link
Member

@dongjoon-hyun @huaxingao @viirya Thank you for the fix, I will collect it into RC2 for 3.5.2

@dongjoon-hyun
Copy link
Member

Thank you so much, @yaooqinn !

@viirya
Copy link
Member Author

viirya commented Jul 19, 2024

Thank you @yaooqinn

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…ved for MergeInto

### What changes were proposed in this pull request?

We got a customer issue that a `MergeInto` query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

```
Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.
```

The source table of `MergeInto` uses `ScalaUDF`. The error happens when Spark invokes the deserializer of input encoder of the `ScalaUDF` and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` which will be applied at the end of analysis phase.

During rewriting `MergeInto` to `ReplaceData` query, Spark creates an `Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that the `ScalaUDF` is already resolved by the analyzer.

Then, in `ResolveSubquery` rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` won't be applied on it. So the analyzed `ReplaceData` query contains a `ScalaUDF` with encoders unresolved that cause the error.

This patch modifies `ResolveSubquery` so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` to make sure the `ScalaUDF` in the subquery plan is fully analyzed.

### Why are the changes needed?

Fixing production query error.

### Does this PR introduce _any_ user-facing change?

Yes, fixing user-facing issue.

### How was this patch tested?

Manually test with `MergeInto` query and add an unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47406 from viirya/fix_subquery_resolve_3.5.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…ved for MergeInto (apache#536)

### What changes were proposed in this pull request?

We got a customer issue that a `MergeInto` query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4.

The error looks like

```
Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object
upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString.
```

The source table of `MergeInto` uses `ScalaUDF`. The error happens when Spark invokes the deserializer of input encoder of the `ScalaUDF` and the deserializer is not resolved yet.

The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` which will be applied at the end of analysis phase.

During rewriting `MergeInto` to `ReplaceData` query, Spark creates an `Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that the `ScalaUDF` is already resolved by the analyzer.

Then, in `ResolveSubquery` rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` won't be applied on it. So the analyzed `ReplaceData` query contains a `ScalaUDF` with encoders unresolved that cause the error.

This patch modifies `ResolveSubquery` so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed.

This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` to make sure the `ScalaUDF` in the subquery plan is fully analyzed.

### Why are the changes needed?

Fixing production query error.

### Does this PR introduce _any_ user-facing change?

Yes, fixing user-facing issue.

### How was this patch tested?

Manually test with `MergeInto` query and add an unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47406 from viirya/fix_subquery_resolve_3.5.

Authored-by: Liang-Chi Hsieh <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants