Skip to content

Conversation

@jiaji-wu
Copy link

What changes were proposed in this pull request?

The current implementation of the replaceWithAliases method in NestedColumnAliasing replaces the children of the current plan with mapped aliases. When the current plan is a Generate -- which keeps a set of indices unrequiredChildIndex for unsed child outputs -- and if the replaced child happens to be in this set, because the unrequiredChildIndex is not updated for this Generate plan, the original unrequiredChildIndex will point to the new children, leading to exceptions/errors.

Why are the changes needed?

See the description above.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

A unit test is added to SparkPlanSuite

@github-actions github-actions bot added the SQL label Jul 30, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@jiaji-wu
Copy link
Author

jiaji-wu commented Aug 2, 2022

Any comments?

@jiaji-wu
Copy link
Author

jiaji-wu commented Aug 6, 2022

Hi @dongjoon-hyun , is it possible to find someone to view the change (it's a relatively small one :)? Thanks.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @jiaji-wu . I'll take a look at this PR.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recommended workaround would be to disable nested schema pruning only instead of disabling all ColumnPruning rule.

spark.sql.optimizer.expression.nestedPruning.enabled=false 
spark.sql.optimizer.nestedSchemaPruning.enabled=false

For example,

$ bin/spark-shell -c spark.sql.optimizer.expression.nestedPruning.enabled=false -c spark.sql.optimizer.nestedSchemaPruning.enabled=false
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/16 17:02:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1663372921582).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.16)
Type in expressions to have them evaluated.
Type :help for more information.scala> :paste
// Entering paste mode (ctrl-D to finish)import spark.implicits._val testJson =
  """{
    | "b": {
    |  "id": "id00",
    |  "data": [{
    |   "b1": "vb1",
    |   "b2": 101,
    |   "ex2": [
    |    { "fb1": false, "fb2": 11, "fb3": "t1" },
    |    { "fb1": true, "fb2": 12, "fb3": "t2" }
    |   ]}, {
    |   "b1": "vb2",
    |   "b2": 102,
    |   "ex2": [
    |    { "fb1": false, "fb2": 13, "fb3": "t3" },
    |    { "fb1": true, "fb2": 14, "fb3": "t4" }
    |   ]}
    |  ],
    |  "fa": "tes",
    |  "v": "1.5"
    | }
    |}
    |""".stripMargin
val df = spark.read.json((testJson :: Nil).toDS())
  .withColumn("ex_b", explode($"b.data.ex2"))
  .withColumn("ex_b2", explode($"ex_b"))
val df1 = df
  .withColumn("rt", struct(
    $"b.fa".alias("rt_fa"),
    $"b.v".alias("rt_v")
  ))
  .drop("b", "ex_b")
df1.show(false)// Exiting paste mode, now interpreting.+---------------+----------+
|ex_b2          |rt        |
+---------------+----------+
|{false, 11, t1}|{tes, 1.5}|
|{true, 12, t2} |{tes, 1.5}|
|{false, 13, t3}|{tes, 1.5}|
|{true, 14, t4} |{tes, 1.5}|
+---------------+----------+import spark.implicits._
testJson: String =
"{
 "b": {
  "id": "id00",
  "data": [{
   "b1": "vb1",
   "b2": 101,
   "ex2": [
    { "fb1": false, "fb2": 11, "fb3": "t1" },
    { "fb1": true, "fb2": 12, "fb3": "t2" }
   ]}, {
   "b1": "vb2",
   "b2": 102,
   "ex2": [
    { "fb1": false, "fb2": 13, "fb3": "t3" },
    { "fb1": true, "fb2": 14, "fb3": "t4" }
   ]}
  ],
  "fa": "tes",
  "v": "1.5"
 }
}
"
df: org.apache.spark.sql.DataFrame = [b: struct<data: array<struct<b1:string,b2:bigint,ex2:array<struct<fb1:boolean,fb2:bigint,fb3:string>>>>, fa: string ... 2 more fields>, ex_b: array<struct<fb1:boolean,fb2:bigint,fb3:string>> ... 1 more field]
df1: org.apache.spark.sql.DataFrame = [ex_b2: struct<fb1: boolean, fb... 

Given that, please add the above two conditions to the test case by using withSQLConf.

withSQLConf(...) {

}
}

test("SPARK-39854: replaceWithAliases should keep the order of Generate children") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for providing the end-to-end test. Can we have a test case in NestedColumnAliasingSuite.scala because we are touching NestedColumnAliasing?

Copy link
Author

@jiaji-wu jiaji-wu Sep 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjoon-hyun , thanks for taking time looking into this!

The test is put in the core module (instead of the catalyst module) because it has dependency on things like SparkSession, sql.functions, which is only available there -- not sure where is the best place to put it, or are there better ways to re-write the test (my knowledge on that part is limited 😓).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add an end-to-end test, the Apache Spark test time increases prohibitively. We prefer to narrow down the issue and have isolated unit tests. So, in this PR, NestedColumnAliasingSuite.scala is the best place to have a test coverage. In short, I don't think we need a heavy end-to-end test case like this. We need to have a test case similar to the other in NestedColumnAliasingSuite.scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. will try to create a test case in NestedColumnAliasingSuite.scala instead.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @karenfeng , @cloud-fan , @viirya , @allisonwang-db .
Unfortunately, the test case shows a regression from #32301 . Could you take a look this issue?

@viirya
Copy link
Member

viirya commented Sep 17, 2022

I'll take a look today or this weekend. Thanks @dongjoon-hyun

edit: don't find a time in the weekend. I will look at this soon.

@dongjoon-hyun
Copy link
Member

Thank you!

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@dongjoon-hyun
Copy link
Member

I removed State tag. Could you rebase this PR to the master branch, @jiaji-wu ?

Comment on lines +148 to +149
import org.apache.spark.sql.functions.{explode, struct}
import org.apache.spark.sql.SparkSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually put imports at the beginning.

Comment on lines +190 to +191
val origOutput = childPlan.output
val fromAlias = childPlan.output.flatMap(a => attrToAliases.getOrElse(a, Nil))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we intend to replace some attributes with its nested fields if they are accessed on top of the plan. So we can prune unused fields later.

If we keep original outputs, I think pruning will not work actually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add a pruning test case to make sure pruning still works.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, this change does not address the real issue. The real issue is, that Generate contains a list unrequiredChildIndex of child output indices, that are not needed in the Generate output. This list has to be adjusted to fit the inserted Project node of NestedColumnAliasing. Here it just fits accidentally, because the original list is included completely at the beginning of the new Project node. But this may include unnecessary outputs, that ColumnPruning is trying to avoid. I have a different proposal, that adjust the list of indices to point to the new positions after attribute aliasing: #49061

@cloud-fan
Copy link
Contributor

shall we change unrequiredChildIndex: Seq[Int] to requiredChildren: Seq[Attribute]? then column position is not an issue anymore.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 10, 2023
@github-actions github-actions bot closed this Apr 11, 2023
@Kimahriman
Copy link
Contributor

I think we just hit this after upgrading to Spark 3.4. Some change is now triggering the GeneratorNestedColumnAliasing where it wasn't happening before, and as a result we're getting corrupted generators because of the stale unrequiredChildIndex value.

@dongjoon-hyun
Copy link
Member

Thank you for reporting, @Kimahriman .

@dongjoon-hyun
Copy link
Member

BTW, may I ask what is your previous successful Spark version, @Kimahriman ?

@Kimahriman
Copy link
Contributor

3.3.2

@cloud-fan
Copy link
Contributor

@Kimahriman feel free to pick up this if you have an idea about how to fix it.

@Kimahriman
Copy link
Contributor

I think I'm also hitting another possibly unrelated bug with the GeneratorNestedColumnAliasing that I'm still trying to track down, and I still haven't figured out what changed between 3.3.2 and 3.4.0 that would cause GeneratorNestedColumnAliasing to trigger now for the exact same plan when it wasn't in 3.3.2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants