Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ case class CatalogRelation(
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
})

override def computeStats: Statistics = {
override def computeStats(): Statistics = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Need to add () for each computeStats because it has side-effects.

// For data source tables, we will create a `LogicalRelation` and won't call this method, for
// hive serde tables, we will always generate a statistics.
// TODO: unify the table stats generation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}

override def computeStats: Statistics =
Statistics(sizeInBytes =
output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
override def computeStats(): Statistics =
Statistics(sizeInBytes = output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)

def toSQL(inlineTableName: String): String = {
require(data.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StructType


abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstraints with Logging {
abstract class LogicalPlan
extends QueryPlan[LogicalPlan]
with LogicalPlanStats
with QueryPlanConstraints
with Logging {

private var _analyzed: Boolean = false

Expand Down Expand Up @@ -80,40 +85,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstrai
}
}

/** A cache for the estimated statistics, such that it will only be computed once. */
private var statsCache: Option[Statistics] = None

/**
* Returns the estimated statistics for the current logical plan node. Under the hood, this
* method caches the return value, which is computed based on the configuration passed in the
* first time. If the configuration changes, the cache can be invalidated by calling
* [[invalidateStatsCache()]].
*/
final def stats: Statistics = statsCache.getOrElse {
statsCache = Some(computeStats)
statsCache.get
}

/** Invalidates the stats cache. See [[stats]] for more information. */
final def invalidateStatsCache(): Unit = {
statsCache = None
children.foreach(_.invalidateStatsCache())
}

/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
* cardinality is the product of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.
*/
protected def computeStats: Statistics = {
if (children.isEmpty) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
Statistics(sizeInBytes = children.map(_.stats.sizeInBytes).product)
}

override def verboseStringWithSuffix: String = {
super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
}
Expand Down Expand Up @@ -300,6 +271,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstrai
abstract class LeafNode extends LogicalPlan {
override final def children: Seq[LogicalPlan] = Nil
override def producedAttributes: AttributeSet = outputSet

/** Leaf nodes that can survive analysis must define their own statistics. */
def computeStats(): Statistics = throw new UnsupportedOperationException
}

/**
Expand Down Expand Up @@ -331,23 +305,6 @@ abstract class UnaryNode extends LogicalPlan {
}

override protected def validConstraints: Set[Expression] = child.constraints

override def computeStats: Statistics = {
// There should be some overhead in Row object, the size should not be zero when there is
// no columns, this help to prevent divide-by-zero error.
val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8
val outputRowSize = output.map(_.dataType.defaultSize).sum + 8
// Assume there will be the same number of rows as child has.
var sizeInBytes = (child.stats.sizeInBytes * outputRowSize) / childRowSize
if (sizeInBytes == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}

// Don't propagate rowCount and attributeStats, since they are not estimated here.
Statistics(sizeInBytes = sizeInBytes, hints = child.stats.hints)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

/**
* A visitor pattern for traversing a [[LogicalPlan]] tree and compute some properties.
*/
trait LogicalPlanVisitor[T] {

def visit(p: LogicalPlan): T = p match {
case p: Aggregate => visitAggregate(p)
case p: Distinct => visitDistinct(p)
case p: Except => visitExcept(p)
case p: Expand => visitExpand(p)
case p: Filter => visitFilter(p)
case p: Generate => visitGenerate(p)
case p: GlobalLimit => visitGlobalLimit(p)
case p: Intersect => visitIntersect(p)
case p: Join => visitJoin(p)
case p: LocalLimit => visitLocalLimit(p)
case p: Pivot => visitPivot(p)
case p: Project => visitProject(p)
case p: Range => visitRange(p)
case p: Repartition => visitRepartition(p)
case p: RepartitionByExpression => visitRepartitionByExpr(p)
case p: Sample => visitSample(p)
case p: ScriptTransformation => visitScriptTransform(p)
case p: Union => visitUnion(p)
case p: ResolvedHint => visitHint(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like they are sorted by the name of logical operators. We can adjust the order later.

case p: LogicalPlan => default(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since LogicalPlan already covers all the other cases, it is fine to cover the limited operators in the current stage.

}

def default(p: LogicalPlan): T

def visitAggregate(p: Aggregate): T

def visitDistinct(p: Distinct): T

def visitExcept(p: Except): T

def visitExpand(p: Expand): T

def visitFilter(p: Filter): T

def visitGenerate(p: Generate): T

def visitGlobalLimit(p: GlobalLimit): T

def visitHint(p: ResolvedHint): T

def visitIntersect(p: Intersect): T

def visitJoin(p: Join): T

def visitLocalLimit(p: LocalLimit): T

def visitPivot(p: Pivot): T

def visitProject(p: Project): T

def visitRange(p: Range): T

def visitRepartition(p: Repartition): T

def visitRepartitionByExpr(p: RepartitionByExpression): T

def visitSample(p: Sample): T

def visitScriptTransform(p: ScriptTransformation): T

def visitUnion(p: Union): T
}
Loading