Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3039,3 +3039,27 @@ case class SplitPart (
partNum = newChildren.apply(2))
}
}

/**
* A internal function that converts the empty string to null for partition values.
* This function should be only used in V1Writes.
*/
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v

override def nullable: Boolean = true

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
})
}

override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, ExpressionSet, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends SQLConfHelper {
private val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
private var _hasAlias = false
protected def outputExpressions: Seq[NamedExpression]

/**
* This method is used to strip expression which does not affect the result, for example:
* strip the expression which is ordering agnostic for output ordering.
*/
protected def strip(expr: Expression): Expression = expr

protected lazy val aliasMap: Map[Expression, ArrayBuffer[Attribute]] = {
if (aliasCandidateLimit < 1) {
Map.empty
} else {
val outputExpressionSet = AttributeSet(outputExpressions.map(_.toAttribute))
val exprWithAliasMap = new mutable.HashMap[Expression, ArrayBuffer[Attribute]]()

def updateAttrWithAliasMap(key: Expression, target: Attribute): Unit = {
val aliasArray = exprWithAliasMap.getOrElseUpdate(
strip(key).canonicalized, new ArrayBuffer[Attribute]())
// pre-filter if the number of alias exceed candidate limit
if (aliasArray.size < aliasCandidateLimit) {
aliasArray.append(target)
}
}

outputExpressions.foreach {
case a @ Alias(child, _) =>
_hasAlias = true
updateAttrWithAliasMap(child, a.toAttribute)
case a: Attribute if outputExpressionSet.contains(a) =>
updateAttrWithAliasMap(a, a)
case _ =>
}
exprWithAliasMap.toMap
}
}

protected def hasAlias: Boolean = {
aliasMap
_hasAlias
}

/**
* Return a set of Expression which normalize the original expression to the aliased.
*/
protected def normalizeExpression(expr: Expression): Seq[Expression] = {
val normalizedCandidates = expr.multiTransformDown {
case e: Expression if aliasMap.contains(e.canonicalized) =>
val candidates = aliasMap(e.canonicalized)
(candidates :+ e).toStream
}.take(aliasCandidateLimit)

if (normalizedCandidates.isEmpty) {
expr :: Nil
} else {
normalizedCandidates.toSeq
}
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
extends AliasAwareOutputExpression { self: QueryPlan[T] =>
protected def orderingExpressions: Seq[SortOrder]

override protected def strip(expr: Expression): Expression = expr match {
case e: Empty2Null => strip(e.child)
case _ => expr
}

override final def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map { sortOrder =>
val normalized = normalizeExpression(sortOrder)
assert(normalized.forall(_.isInstanceOf[SortOrder]))
val pruned = ExpressionSet(normalized.flatMap {
case s: SortOrder => s.children.filter(_.references.subsetOf(outputSet))
})
if (pruned.isEmpty) {
sortOrder
} else {
// All expressions after pruned are semantics equality, so just use head to build a new
// SortOrder and use tail as the sameOrderExpressions.
SortOrder(pruned.head, sortOrder.direction, sortOrder.nullOrdering, pruned.tail.toSeq)
}
}
} else {
orderingExpressions
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
@transient
lazy val outputSet: AttributeSet = AttributeSet(output)

/**
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

// Override `treePatternBits` to propagate bits for its expressions.
override lazy val treePatternBits: BitSet = {
val bits: BitSet = getDefaultTreePatternBits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -141,11 +141,6 @@ abstract class LogicalPlan
*/
def refresh(): Unit = children.foreach(_.refresh())

/**
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

/**
* Returns true iff `other`'s output is semantically the same, i.e.:
* - it contains the same number of `Attribute`s;
Expand Down Expand Up @@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
*/
trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]

abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
trait OrderPreservingUnaryNode extends UnaryNode
with AliasAwareQueryOutputOrdering[LogicalPlan] {
override protected def outputExpressions: Seq[NamedExpression] = child.output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means we don't do any alias replacement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it just specifies it's outputExpressions to AliasAwareOutputExpression so that AliasAwareOutputExpression can build alias map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but child.output has no alias at all, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Project has overridden it..

override protected def outputExpressions: Seq[NamedExpression] = projectList

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which subclass uses the default implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter/Limit etc..

override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
}

object LogicalPlanIntegrity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override protected def outputExpressions: Seq[NamedExpression] = projectList
override def maxRows: Option[Long] = child.maxRows
override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val EXPRESSION_PROJECTION_CANDIDATE_LIMIT =
buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit")
.doc("The maximum number of the candidate of out put expressions whose alias are replaced." +
" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
Comment on lines +440 to +442
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.doc("The maximum number of the candidate of out put expressions whose alias are replaced." +
" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
.doc("The maximum number of candidates for output expressions whose aliases are replaced." +
" This can preserve the output partitioning and ordering." +
" A negative value means to disable this optimization.")

.version("3.4.0")
.intConf
.createWithDefault(100)

val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,41 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]

private lazy val aliasMap = outputExpressions.collect {
case a @ Alias(child, _) => child.canonicalized -> a.toAttribute
}.toMap

protected def hasAlias: Boolean = aliasMap.nonEmpty

protected def normalizeExpression(exp: Expression): Expression = {
exp.transformDown {
case e: Expression => aliasMap.getOrElse(e.canonicalized, e)
}
}
}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}

/**
* A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that
* satisfies distribution requirements.
*/
trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
trait AliasAwareOutputPartitioning extends UnaryExecNode
with AliasAwareOutputExpression {

final override def outputPartitioning: Partitioning = {
val normalizedOutputPartitioning = if (hasAlias) {
child.outputPartitioning match {
case e: Expression =>
normalizeExpression(e).asInstanceOf[Partitioning]
val normalized = normalizeExpression(e)
if (normalized.isEmpty) {
UnknownPartitioning(child.outputPartitioning.numPartitions)
} else if (normalized.size == 1) {
normalized.head.asInstanceOf[Partitioning]
} else {
PartitioningCollection(normalized.asInstanceOf[Seq[Partitioning]])
}
case other => other
}
} else {
child.outputPartitioning
}

flattenPartitioning(normalizedOutputPartitioning).filter {
case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet)
val (partitionWithExpr, other) = flattenPartitioning(normalizedOutputPartitioning).filter {
case e: Expression => e.references.subsetOf(outputSet)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to also handle such as RangePartitioning

case _ => true
} match {
}.partition(_.isInstanceOf[Expression])
val pruned = ExpressionSet(partitionWithExpr.asInstanceOf[Seq[Expression]])
(pruned.toSeq.asInstanceOf[Seq[Partitioning]] ++ other) match {
case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
case Seq(singlePartitioning) => singlePartitioning
case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings)
Expand All @@ -74,18 +67,4 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
}
}

/**
* A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that
* satisfies ordering requirements.
*/
trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
protected def orderingExpressions: Seq[SortOrder]

final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
} else {
orderingExpressions
}
}
}
trait AliasAwareOutputOrdering extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan]
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildDistribution: Seq[Distribution] =
Seq.fill(children.size)(UnspecifiedDistribution)

/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil

/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ object FileFormatWriter extends Logging {
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = writeFilesOpt.map(_.child)
.getOrElse(materializeAdaptiveSparkPlan(plan))
.outputOrdering.map(_.child)
.outputOrdering
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String

trait V1WriteCommand extends DataWritingCommand {
/**
Expand Down Expand Up @@ -121,26 +119,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
}

object V1WritesUtils {

/** A function that converts the empty string to null for partition values. */
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v
override def nullable: Boolean = true
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
})
}

override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}

def getWriterBucketSpec(
bucketSpec: Option[BucketSpec],
dataColumns: Seq[Attribute],
Expand Down Expand Up @@ -230,12 +208,14 @@ object V1WritesUtils {

def isOrderingMatched(
requiredOrdering: Seq[Expression],
outputOrdering: Seq[Expression]): Boolean = {
outputOrdering: Seq[SortOrder]): Boolean = {
if (requiredOrdering.length > outputOrdering.length) {
false
} else {
requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder)
case (requiredOrder, outputOrder) =>
// Follow `SortOrder.satisfies` that respects `SortOrder.sameOrderExpressions`
outputOrder.children.exists(_.semanticEquals(requiredOrder))
}
}
}
Expand Down
Loading