@@ -50,56 +50,54 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
5050 */
5151trait AliasAwareOutputPartitioning extends UnaryExecNode {
5252
53+ /**
54+ * `Seq` of `Expression`s which define the ouput of the node.
55+ */
5356 protected def outputExpressions : Seq [NamedExpression ]
5457
55- private def hasAlias ( exprs : Seq [ NamedExpression ]) : Boolean =
56- exprs.exists(_.collectFirst { case _ : Alias => true }.isDefined)
57-
58+ /**
59+ * Returns the valid `Partitioning`s for the node w.r.t its output and its expressions.
60+ */
5861 final override def outputPartitioning : Partitioning = {
59- if (hasAlias(outputExpressions)) {
60- // Returns the valid `Partitioning`s for the node w.r.t its output and its expressions.
61- child.outputPartitioning match {
62- case partitioning : Expression =>
63- val exprToEquiv = partitioning.references.map { attr =>
64- attr -> outputExpressions.filter(e =>
65- CleanupAliases .trimAliases(e).semanticEquals(attr))
66- }.filterNot { case (attr, exprs) =>
67- exprs.size == 1 && exprs.forall(_ == attr)
68- }
69- val initValue = partitioning match {
70- case PartitioningCollection (partitionings) => partitionings
71- case other => Seq (other)
72- }
73- val validPartitionings = exprToEquiv.foldLeft(initValue) {
74- case (partitionings, (toReplace, equivalents)) =>
75- if (equivalents.isEmpty) {
76- partitionings.map(_.pruneInvalidAttribute(toReplace))
77- } else {
78- partitionings.flatMap {
79- case p : Expression if p.references.contains(toReplace) =>
80- equivalents.map { equiv =>
81- p.transformDown {
82- case e if e == toReplace => equiv.toAttribute
83- }.asInstanceOf [Partitioning ]
84- }
85- case other => Seq (other)
86- }
62+ child.outputPartitioning match {
63+ case partitioning : Expression =>
64+ val exprToEquiv = partitioning.references.map { attr =>
65+ attr -> outputExpressions.filter(e =>
66+ CleanupAliases .trimAliases(e).semanticEquals(attr))
67+ }.filterNot { case (attr, exprs) =>
68+ exprs.size == 1 && exprs.forall(_ == attr)
69+ }
70+ val initValue = partitioning match {
71+ case PartitioningCollection (partitionings) => partitionings
72+ case other => Seq (other)
73+ }
74+ val validPartitionings = exprToEquiv.foldLeft(initValue) {
75+ case (partitionings, (toReplace, equivalents)) =>
76+ if (equivalents.isEmpty) {
77+ partitionings.map(_.pruneInvalidAttribute(toReplace))
78+ } else {
79+ partitionings.flatMap {
80+ case p : Expression if p.references.contains(toReplace) =>
81+ equivalents.map { equiv =>
82+ p.transformDown {
83+ case e if e == toReplace => equiv.toAttribute
84+ }.asInstanceOf [Partitioning ]
85+ }
86+ case other => Seq (other)
8787 }
88- }.distinct
89- if (validPartitionings.size == 1 ) {
90- validPartitionings.head
91- } else {
92- validPartitionings.filterNot(_.isInstanceOf [UnknownPartitioning ]) match {
93- case Seq () => PartitioningCollection (validPartitionings)
94- case Seq (knownPartitioning) => knownPartitioning
95- case knownPartitionings => PartitioningCollection (knownPartitionings)
9688 }
97-
89+ }.distinct
90+ if (validPartitionings.size == 1 ) {
91+ validPartitionings.head
92+ } else {
93+ validPartitionings.filterNot(_.isInstanceOf [UnknownPartitioning ]) match {
94+ case Seq () => PartitioningCollection (validPartitionings)
95+ case Seq (knownPartitioning) => knownPartitioning
96+ case knownPartitionings => PartitioningCollection (knownPartitionings)
9897 }
99- case other => other
100- }
101- } else {
102- child.outputPartitioning
98+
99+ }
100+ case other => other
103101 }
104102 }
105103}
0 commit comments