Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import org.apache.spark.{Partition, TaskContext}
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
* @param isPartitionKeyIndeterminate whether or not the partition key is indeterminate.
* If not, it may return different result event though
* [[org.apache.spark.Partitioner]] is deterministic.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
isOrderSensitive: Boolean = false,
isPartitionKeyIndeterminate: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Expand All @@ -60,7 +64,8 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
isFromBarrier || dependencies.exists(_.rdd.isBarrier())

override protected def getOutputDeterministicLevel = {
if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
if (isPartitionKeyIndeterminate ||
(isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED)) {
DeterministicLevel.INDETERMINATE
} else {
super.getOutputDeterministicLevel
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -867,16 +867,21 @@ abstract class RDD[T: ClassTag](
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
* sensitive, it may return totally different result when the input order
* is changed. Mostly stateful functions are order-sensitive.
* @param isPartitionKeyIndeterminate whether or not the partition key is indeterminate.
* If not, it may return different result event though
* [[org.apache.spark.Partitioner]] is deterministic.
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false): RDD[U] = withScope {
isOrderSensitive: Boolean = false,
isPartitionKeyIndeterminate: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive)
isOrderSensitive = isOrderSensitive,
isPartitionKeyIndeterminate = isPartitionKeyIndeterminate)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, NamedExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
Expand Down Expand Up @@ -167,7 +167,7 @@ case class ShuffleExchangeExec(
lazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {
val dep = ShuffleExchangeExec.prepareShuffleDependency(
inputRDD,
child.output,
child,
outputPartitioning,
serializer,
writeMetrics)
Expand Down Expand Up @@ -260,11 +260,12 @@ object ShuffleExchangeExec {
*/
def prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
child: SparkPlan,
newPartitioning: Partitioning,
serializer: Serializer,
writeMetrics: Map[String, SQLMetric])
: ShuffleDependency[Int, InternalRow, InternalRow] = {
val outputAttributes = child.output
val part: Partitioner = newPartitioning match {
case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
case HashPartitioning(_, n) =>
Expand Down Expand Up @@ -373,18 +374,21 @@ object ShuffleExchangeExec {
}

// round-robin function is order sensitive if we don't sort the input.
val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
val isOrderSensitive = (isRoundRobin && !SQLConf.get.sortBeforeRepartition)
val isPartitionKeyIndeterminate = isPartitioningIndeterminate(newPartitioning, child)
if (needToCopyObjectsBeforeShuffle(part)) {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
}, isOrderSensitive = isOrderSensitive)
}, isOrderSensitive = isOrderSensitive,
isPartitionKeyIndeterminate = isPartitionKeyIndeterminate)
} else {
newRdd.mapPartitionsWithIndexInternal((_, iter) => {
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
}, isOrderSensitive = isOrderSensitive)
}, isOrderSensitive = isOrderSensitive,
isPartitionKeyIndeterminate = isPartitionKeyIndeterminate)
}
}

Expand All @@ -401,6 +405,29 @@ object ShuffleExchangeExec {
dependency
}

/**
* Checks if the shuffle partitioning contains indeterminate expression/reference.
*/
private def isPartitioningIndeterminate(partitioning: Partitioning, plan: SparkPlan): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to build a framework to properly propagate the column-level nondeterministic information. This function looks quite fragile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, Filter(rand_cond, Project(a, b, c, ...)). I think all the columns are nondeterministic after Filter, even though attributes a, b and c are deterministic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the QueryPlan's deterministic? #34470

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's plan level, not column level. We need something more fine-grained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll first try to add a column level nondeterministic information before this pr

val indeterminateAttrs = plan.flatMap(_.expressions).collect {
case e: NamedExpression if !e.deterministic => e.exprId
}.toSet

def hasIndeterminateReference(e: Expression): Boolean = {
indeterminateAttrs.size > 0 &&
e.find {
case a: AttributeReference if indeterminateAttrs.contains(a.exprId) => true
case _ => false
}.nonEmpty
}

partitioning match {
case HashPartitioning(exprs, _)
if exprs.exists(e => !e.deterministic || hasIndeterminateReference(e)) => true
case _ => false
}
}

/**
* Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter
* with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited,
child.output,
child,
SinglePartition,
serializer,
writeMetrics),
Expand Down Expand Up @@ -233,7 +233,7 @@ case class TakeOrderedAndProjectExec(
new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
localTopK,
child.output,
child,
SinglePartition,
serializer,
writeMetrics),
Expand Down
24 changes: 24 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
import org.mockito.Mockito._

import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder}
Expand Down Expand Up @@ -1440,4 +1441,27 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}
}

test("Join by rand should generate indeterminate mapPartitionRDD") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
"""
|SELECT
| t1.key, t2.randkey, t2.a
|FROM
| testData t1
|LEFT JOIN
| (select rand() as randkey, a from testData2) t2
|ON
| t1.key = t2.randkey
""".stripMargin)

val shuffleDeps = collect(df.queryExecution.executedPlan) {
case s: ShuffleExchangeExec => s.shuffleDependency
}
assert(shuffleDeps.size == 2)
assert(shuffleDeps.filter(
_.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE).size == 1)
}
}
}