From 1e2af3fdf76a41102e081eefe120ded91d54de6c Mon Sep 17 00:00:00 2001 From: Thomas Rohwer Date: Wed, 4 Dec 2024 11:08:14 +0000 Subject: [PATCH 1/4] fix problem in replaceWithAliases in connection with Generate plan node --- .../optimizer/NestedColumnAliasing.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 8de2663a98094..8a8e98f74575c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -185,9 +185,21 @@ object NestedColumnAliasing { plan: LogicalPlan, nestedFieldToAlias: Map[Expression, Alias], attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = { - plan.withNewChildren(plan.children.map { plan => - Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan) - }).transformExpressions { + // withNewChildren is dangerous for Generate; one has to adjust unrequiredChildIndex accordingly in this case + val withNewChildren= plan match { + case g: Generate => { + // child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) + val unrequiredSet = g.unrequiredChildIndex.toSet + val flagRes= g.child.output.zipWithIndex.flatMap( t => + attrToAliases.getOrElse(t._1, Seq(t._1)).map( e => ( e, unrequiredSet.contains(t._2) ) ) + ) + val unrequiredChildIndex= flagRes.map(_._2).zipWithIndex.filter(t => t._1).map(_._2) + g.copy(child=Project(flagRes.map(_._1), g.child), unrequiredChildIndex=unrequiredChildIndex) + } + case _ => plan.withNewChildren(plan.children.map { plan => + Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan)}) + } + withNewChildren.transformExpressions { case f: ExtractValue if nestedFieldToAlias.contains(f.canonicalized) => nestedFieldToAlias(f.canonicalized).toAttribute } From d1f1554e34a51d93f4ea1f65a0cf3da8e463de41 Mon Sep 17 00:00:00 2001 From: Thomas Rohwer Date: Thu, 5 Dec 2024 12:24:26 +0000 Subject: [PATCH 2/4] remove unrelated comment --- .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 8a8e98f74575c..dc2b545504480 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -188,7 +188,6 @@ object NestedColumnAliasing { // withNewChildren is dangerous for Generate; one has to adjust unrequiredChildIndex accordingly in this case val withNewChildren= plan match { case g: Generate => { - // child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) val unrequiredSet = g.unrequiredChildIndex.toSet val flagRes= g.child.output.zipWithIndex.flatMap( t => attrToAliases.getOrElse(t._1, Seq(t._1)).map( e => ( e, unrequiredSet.contains(t._2) ) ) From 7ddf36d03c74232bd65cd5d024a07d0c7aa69653 Mon Sep 17 00:00:00 2001 From: Thomas Rohwer Date: Thu, 5 Dec 2024 12:49:27 +0000 Subject: [PATCH 3/4] fix code format --- .../catalyst/optimizer/NestedColumnAliasing.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index dc2b545504480..8fca181e859f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -185,16 +185,16 @@ object NestedColumnAliasing { plan: LogicalPlan, nestedFieldToAlias: Map[Expression, Alias], attrToAliases: AttributeMap[Seq[Alias]]): LogicalPlan = { - // withNewChildren is dangerous for Generate; one has to adjust unrequiredChildIndex accordingly in this case - val withNewChildren= plan match { - case g: Generate => { + // withNewChildren is dangerous for Generate; + // one has to adjust unrequiredChildIndex accordingly in this case + val withNewChildren = plan match { + case g: Generate => val unrequiredSet = g.unrequiredChildIndex.toSet - val flagRes= g.child.output.zipWithIndex.flatMap( t => + val flagRes = g.child.output.zipWithIndex.flatMap( t => attrToAliases.getOrElse(t._1, Seq(t._1)).map( e => ( e, unrequiredSet.contains(t._2) ) ) ) - val unrequiredChildIndex= flagRes.map(_._2).zipWithIndex.filter(t => t._1).map(_._2) - g.copy(child=Project(flagRes.map(_._1), g.child), unrequiredChildIndex=unrequiredChildIndex) - } + val unrequiredChildIndex = flagRes.map(_._2).zipWithIndex.filter(t => t._1).map(_._2) + g.copy(child = Project(flagRes.map(_._1), g.child), unrequiredChildIndex = unrequiredChildIndex) case _ => plan.withNewChildren(plan.children.map { plan => Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan)}) } From 3102f76b97fa61b484da18bdb3108c6836183c21 Mon Sep 17 00:00:00 2001 From: Thomas Rohwer Date: Thu, 5 Dec 2024 13:23:00 +0000 Subject: [PATCH 4/4] fix code format --- .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 8fca181e859f5..cd4070b391234 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -194,7 +194,10 @@ object NestedColumnAliasing { attrToAliases.getOrElse(t._1, Seq(t._1)).map( e => ( e, unrequiredSet.contains(t._2) ) ) ) val unrequiredChildIndex = flagRes.map(_._2).zipWithIndex.filter(t => t._1).map(_._2) - g.copy(child = Project(flagRes.map(_._1), g.child), unrequiredChildIndex = unrequiredChildIndex) + g.copy( + child = Project(flagRes.map(_._1), g.child), + unrequiredChildIndex = unrequiredChildIndex + ) case _ => plan.withNewChildren(plan.children.map { plan => Project(plan.output.flatMap(a => attrToAliases.getOrElse(a, Seq(a))), plan)}) }