Skip to content

Commit da8eea4

Browse files
committed
[SPARK-3688][SQL] More inline comments for LogicalPlan.
As a follow-up to #4524
1 parent 44b2311 commit da8eea4

File tree

5 files changed

+115
-42
lines changed

5 files changed

+115
-42
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ package org.apache.spark.sql.catalyst
2525
package object analysis {
2626

2727
/**
28-
* Responsible for resolving which identifiers refer to the same entity. For example, by using
29-
* case insensitive equality.
28+
* Resolver should return true if the first string refers to the same entity as the second string.
29+
* For example, by using case insensitive equality.
3030
*/
3131
type Resolver = (String, String) => Boolean
3232

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ abstract class NamedExpression extends Expression {
4040

4141
def name: String
4242
def exprId: ExprId
43+
44+
/**
45+
* All possible qualifiers for the expression.
46+
*
47+
* For now, since we do not allow using original table name to qualify a column name once the
48+
* table is aliased, this can only be:
49+
*
50+
* 1. Empty Seq: when an attribute doesn't have a qualifier,
51+
* e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation.
52+
* 2. Single element: either the table name or the alias name of the table.
53+
*/
4354
def qualifiers: Seq[String]
4455

4556
def toAttribute: Attribute

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
152152
/** Prints out the schema in the tree format */
153153
def printSchema(): Unit = println(schemaString)
154154

155+
/**
156+
* A prefix string used when printing the plan.
157+
*
158+
* We use "!" to indicate an invalid plan, and "'" to indicate an unresolved plan.
159+
*/
155160
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
156161

157162
override def simpleString = statePrefix + super.simpleString

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,29 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.Logging
21+
import org.apache.spark.sql.AnalysisException
2122
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
22-
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.QueryPlan
2525
import org.apache.spark.sql.catalyst.trees.TreeNode
26-
import org.apache.spark.sql.types.StructType
2726
import org.apache.spark.sql.catalyst.trees
2827

29-
/**
30-
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
31-
* corresponding statistic produced by the children. To override this behavior, override
32-
* `statistics` and assign it an overridden version of `Statistics`.
33-
*
34-
* '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
35-
* performance of the implementations. The reason is that estimations might get triggered in
36-
* performance-critical processes, such as query plan planning.
37-
*
38-
* Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
39-
* cardinality estimation (e.g. cartesian joins).
40-
*
41-
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
42-
* defaults to the product of children's `sizeInBytes`.
43-
*/
44-
private[sql] case class Statistics(sizeInBytes: BigInt)
4528

4629
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
4730
self: Product =>
4831

32+
/**
33+
* Computes [[Statistics]] for this plan. The default implementation assumes the output
34+
* cardinality is the product of of all child plan's cardinality, i.e. applies in the case
35+
* of cartesian joins.
36+
*
37+
* [[LeafNode]]s must override this.
38+
*/
4939
def statistics: Statistics = {
5040
if (children.size == 0) {
5141
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
5242
}
53-
54-
Statistics(
55-
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
43+
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
5644
}
5745

5846
/**
@@ -128,26 +116,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
128116
def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
129117
resolve(name, output, resolver)
130118

131-
def resolveAsTableColumn(
119+
/**
120+
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
121+
*
122+
* This assumes `name` has multiple parts, where the 1st part is a qualifier
123+
* (i.e. table name, alias, or subquery alias).
124+
* See the comment above `candidates` variable in resolve() for semantics the returned data.
125+
*/
126+
private def resolveAsTableColumn(
132127
nameParts: Array[String],
133128
resolver: Resolver,
134-
attribute: Attribute): List[(Attribute, List[String])] = {
135-
if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
136-
val remainingParts = nameParts.drop(1)
129+
attribute: Attribute): Option[(Attribute, List[String])] = {
130+
assert(nameParts.length > 1)
131+
if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty) {
132+
// At least one qualifier matches. See if remaining parts match.
133+
val remainingParts = nameParts.tail
137134
resolveAsColumn(remainingParts, resolver, attribute)
138135
} else {
139-
Nil
136+
None
140137
}
141138
}
142139

143-
def resolveAsColumn(
140+
/**
141+
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
142+
*
143+
* Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
144+
* See the comment above `candidates` variable in resolve() for semantics the returned data.
145+
*/
146+
private def resolveAsColumn(
144147
nameParts: Array[String],
145148
resolver: Resolver,
146-
attribute: Attribute): List[(Attribute, List[String])] = {
149+
attribute: Attribute): Option[(Attribute, List[String])] = {
147150
if (resolver(attribute.name, nameParts.head)) {
148-
(attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
151+
Option((attribute.withName(nameParts.head), nameParts.tail.toList))
149152
} else {
150-
Nil
153+
None
151154
}
152155
}
153156

@@ -159,25 +162,44 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
159162

160163
val parts = name.split("\\.")
161164

162-
// We will try to resolve this name as `table.column` pattern first.
163-
var options = input.flatMap { option =>
164-
resolveAsTableColumn(parts, resolver, option)
165+
// A sequence of possible candidate matches.
166+
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
167+
// of parts that are to be resolved.
168+
// For example, consider an example where "a" is the table name, "b" is the column name,
169+
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
170+
// and the second element will be List("c").
171+
var candidates: Seq[(Attribute, List[String])] = {
172+
// If the name has 2 or more parts, try to resolve it as `table.column` first.
173+
if (parts.length > 1) {
174+
input.flatMap { option =>
175+
resolveAsTableColumn(parts, resolver, option)
176+
}
177+
} else {
178+
Seq.empty
179+
}
165180
}
166181

167182
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
168-
if(options.isEmpty) {
169-
options = input.flatMap { option =>
170-
resolveAsColumn(parts, resolver, option)
183+
if (candidates.isEmpty) {
184+
candidates = input.flatMap { candidate =>
185+
resolveAsColumn(parts, resolver, candidate)
171186
}
172187
}
173188

174-
options.distinct match {
189+
candidates.distinct match {
175190
// One match, no nested fields, use it.
176191
case Seq((a, Nil)) => Some(a)
177192

178193
// One match, but we also need to extract the requested nested field.
179194
case Seq((a, nestedFields)) =>
180-
Some(Alias(nestedFields.foldLeft(a: Expression)(UnresolvedGetField), nestedFields.last)())
195+
// The foldLeft adds UnresolvedGetField for every remaining parts of the name,
196+
// and aliased it with the last part of the name.
197+
// For example, consider name "a.b.c", where "a" is resolved to an existing attribute.
198+
// Then this will add UnresolvedGetField("b") and UnresolvedGetField("c"), and alias
199+
// the final expression as "c".
200+
val fieldExprs = nestedFields.foldLeft(a: Expression)(UnresolvedGetField)
201+
val aliasName = nestedFields.last
202+
Some(Alias(fieldExprs, aliasName)())
181203

182204
// No matches.
183205
case Seq() =>
@@ -186,8 +208,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
186208

187209
// More than one match.
188210
case ambiguousReferences =>
189-
throw new TreeNodeException(
190-
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
211+
throw new AnalysisException(
212+
s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
191213
}
192214
}
193215
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
/**
21+
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
22+
* corresponding statistic produced by the children. To override this behavior, override
23+
* `statistics` and assign it an overridden version of `Statistics`.
24+
*
25+
* '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
26+
* performance of the implementations. The reason is that estimations might get triggered in
27+
* performance-critical processes, such as query plan planning.
28+
*
29+
* Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
30+
* cardinality estimation (e.g. cartesian joins).
31+
*
32+
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
33+
* defaults to the product of children's `sizeInBytes`.
34+
*/
35+
private[sql] case class Statistics(sizeInBytes: BigInt)

0 commit comments

Comments
 (0)