Skip to content

Commit 23e7935

Browse files
hvanhovellmarkhamstra
authored andcommitted
[SPARK-16406][SQL] Improve performance of LogicalPlan.resolve
`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` Existing tests. Author: Herman van Hovell <[email protected]> Closes apache#14083 from hvanhovell/SPARK-16406.
1 parent 83370ef commit 23e7935

File tree

2 files changed

+93
-101
lines changed

2 files changed

+93
-101
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import java.util.Locale
21+
2022
import com.google.common.collect.Maps
2123

24+
import org.apache.spark.sql.AnalysisException
25+
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
2226
import org.apache.spark.sql.catalyst.expressions._
2327
import org.apache.spark.sql.types.{StructField, StructType}
2428

@@ -131,6 +135,88 @@ package object expressions {
131135
def indexOf(exprId: ExprId): Int = {
132136
Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
133137
}
138+
139+
private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = {
140+
m.mapValues(_.distinct).map(identity)
141+
}
142+
143+
/** Map to use for direct case insensitive attribute lookups. */
144+
@transient private lazy val direct: Map[String, Seq[Attribute]] = {
145+
unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT)))
146+
}
147+
148+
/** Map to use for qualified case insensitive attribute lookups. */
149+
@transient private val qualified: Map[(String, String), Seq[Attribute]] = {
150+
val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
151+
(a.qualifier.get.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT))
152+
}
153+
unique(grouped)
154+
}
155+
156+
/** Perform attribute resolution given a name and a resolver. */
157+
def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = {
158+
// Collect matching attributes given a name and a lookup.
159+
def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = {
160+
candidates.toSeq.flatMap(_.collect {
161+
case a if resolver(a.name, name) => a.withName(name)
162+
})
163+
}
164+
165+
// Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name,
166+
// alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of
167+
// matched attributes and a list of parts that are to be resolved.
168+
//
169+
// For example, consider an example where "a" is the table name, "b" is the column name,
170+
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
171+
// and the second element will be List("c").
172+
val matches = nameParts match {
173+
case qualifier +: name +: nestedFields =>
174+
val key = (qualifier.toLowerCase(Locale.ROOT), name.toLowerCase(Locale.ROOT))
175+
val attributes = collectMatches(name, qualified.get(key)).filter { a =>
176+
resolver(qualifier, a.qualifier.get)
177+
}
178+
(attributes, nestedFields)
179+
case all =>
180+
(Nil, all)
181+
}
182+
183+
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
184+
val (candidates, nestedFields) = matches match {
185+
case (Seq(), _) =>
186+
val name = nameParts.head
187+
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
188+
(attributes, nameParts.tail)
189+
case _ => matches
190+
}
191+
192+
def name = UnresolvedAttribute(nameParts).name
193+
candidates match {
194+
case Seq(a) if nestedFields.nonEmpty =>
195+
// One match, but we also need to extract the requested nested field.
196+
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
197+
// and aliased it with the last part of the name.
198+
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
199+
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
200+
// expression as "c".
201+
val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) =>
202+
ExtractValue(e, Literal(name), resolver)
203+
}
204+
Some(Alias(fieldExprs, nestedFields.last)())
205+
206+
case Seq(a) =>
207+
// One match, no nested fields, use it.
208+
Some(a)
209+
210+
case Seq() =>
211+
// No matches.
212+
None
213+
214+
case ambiguousReferences =>
215+
// More than one match.
216+
val referenceNames = ambiguousReferences.map(_.qualifiedName).mkString(", ")
217+
throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.")
218+
}
219+
}
134220
}
135221

136222
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
160160
}
161161
}
162162

163+
private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
164+
165+
private[this] lazy val outputAttributes = AttributeSeq(output)
166+
163167
/**
164168
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
165169
* nodes of this LogicalPlan. The attribute is expressed as
@@ -168,7 +172,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
168172
def resolveChildren(
169173
nameParts: Seq[String],
170174
resolver: Resolver): Option[NamedExpression] =
171-
resolve(nameParts, children.flatMap(_.output), resolver)
175+
childAttributes.resolve(nameParts, resolver)
172176

173177
/**
174178
* Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
@@ -178,7 +182,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
178182
def resolve(
179183
nameParts: Seq[String],
180184
resolver: Resolver): Option[NamedExpression] =
181-
resolve(nameParts, output, resolver)
185+
outputAttributes.resolve(nameParts, resolver)
182186

183187
/**
184188
* Given an attribute name, split it to name parts by dot, but
@@ -188,105 +192,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
188192
def resolveQuoted(
189193
name: String,
190194
resolver: Resolver): Option[NamedExpression] = {
191-
resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver)
192-
}
193-
194-
/**
195-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
196-
*
197-
* This assumes `name` has multiple parts, where the 1st part is a qualifier
198-
* (i.e. table name, alias, or subquery alias).
199-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
200-
*/
201-
private def resolveAsTableColumn(
202-
nameParts: Seq[String],
203-
resolver: Resolver,
204-
attribute: Attribute): Option[(Attribute, List[String])] = {
205-
assert(nameParts.length > 1)
206-
if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
207-
// At least one qualifier matches. See if remaining parts match.
208-
val remainingParts = nameParts.tail
209-
resolveAsColumn(remainingParts, resolver, attribute)
210-
} else {
211-
None
212-
}
213-
}
214-
215-
/**
216-
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
217-
*
218-
* Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
219-
* See the comment above `candidates` variable in resolve() for semantics the returned data.
220-
*/
221-
private def resolveAsColumn(
222-
nameParts: Seq[String],
223-
resolver: Resolver,
224-
attribute: Attribute): Option[(Attribute, List[String])] = {
225-
if (!attribute.isGenerated && resolver(attribute.name, nameParts.head)) {
226-
Option((attribute.withName(nameParts.head), nameParts.tail.toList))
227-
} else {
228-
None
229-
}
230-
}
231-
232-
/** Performs attribute resolution given a name and a sequence of possible attributes. */
233-
protected def resolve(
234-
nameParts: Seq[String],
235-
input: Seq[Attribute],
236-
resolver: Resolver): Option[NamedExpression] = {
237-
238-
// A sequence of possible candidate matches.
239-
// Each candidate is a tuple. The first element is a resolved attribute, followed by a list
240-
// of parts that are to be resolved.
241-
// For example, consider an example where "a" is the table name, "b" is the column name,
242-
// and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
243-
// and the second element will be List("c").
244-
var candidates: Seq[(Attribute, List[String])] = {
245-
// If the name has 2 or more parts, try to resolve it as `table.column` first.
246-
if (nameParts.length > 1) {
247-
input.flatMap { option =>
248-
resolveAsTableColumn(nameParts, resolver, option)
249-
}
250-
} else {
251-
Seq.empty
252-
}
253-
}
254-
255-
// If none of attributes match `table.column` pattern, we try to resolve it as a column.
256-
if (candidates.isEmpty) {
257-
candidates = input.flatMap { candidate =>
258-
resolveAsColumn(nameParts, resolver, candidate)
259-
}
260-
}
261-
262-
def name = UnresolvedAttribute(nameParts).name
263-
264-
candidates.distinct match {
265-
// One match, no nested fields, use it.
266-
case Seq((a, Nil)) => Some(a)
267-
268-
// One match, but we also need to extract the requested nested field.
269-
case Seq((a, nestedFields)) =>
270-
// The foldLeft adds ExtractValues for every remaining parts of the identifier,
271-
// and aliased it with the last part of the name.
272-
// For example, consider "a.b.c", where "a" is resolved to an existing attribute.
273-
// Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final
274-
// expression as "c".
275-
val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, fieldName) =>
276-
ExtractValue(expr, Literal(fieldName), resolver))
277-
Some(Alias(fieldExprs, nestedFields.last)())
278-
279-
// No matches.
280-
case Seq() =>
281-
logTrace(s"Could not find $name in ${input.mkString(", ")}")
282-
None
283-
284-
// More than one match.
285-
case ambiguousReferences =>
286-
val referenceNames = ambiguousReferences.map(_._1).mkString(", ")
287-
throw new AnalysisException(
288-
s"Reference '$name' is ambiguous, could be: $referenceNames.")
289-
}
195+
outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
290196
}
291197

292198
/**

0 commit comments

Comments
 (0)