Skip to content

Commit c610de6

Browse files
committed
[SPARK-29042][CORE] Sampling-based RDD with unordered input should be INDETERMINATE
### What changes were proposed in this pull request? We already have found and fixed the correctness issue before when RDD output is INDETERMINATE. One missing part is sampling-based RDD. This kind of RDDs is order sensitive to its input. A sampling-based RDD with unordered input, should be INDETERMINATE. ### Why are the changes needed? A sampling-based RDD with unordered input is just like MapPartitionsRDD with isOrderSensitive parameter as true. The RDD output can be different after a rerun. It is a problem in ML applications. In ML, sample is used to prepare training data. ML algorithm fits the model based on the sampled data. If rerun tasks of sample produce different output during model fitting, ML results will be unreliable and also buggy. Each sample is random output, but once you sampled, the output should be determinate. ### Does this PR introduce any user-facing change? Previously, a sampling-based RDD can possibly come with different output after a rerun. After this patch, sampling-based RDD is INDETERMINATE. For an INDETERMINATE map stage, currently Spark scheduler will re-try all the tasks of the failed stage. ### How was this patch tested? Added test. Closes #25751 from viirya/sample-order-sensitive. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent e63098b commit c610de6

File tree

5 files changed

+56
-12
lines changed

5 files changed

+56
-12
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
261261
} else {
262262
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
263263
}
264-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
264+
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
265265
}
266266

267267
/**
@@ -291,7 +291,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
291291
} else {
292292
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
293293
}
294-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
294+
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
295295
}
296296

297297
/**

core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,12 @@ private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
6767
thisSampler.setSeed(split.seed)
6868
thisSampler.sample(firstParent[T].iterator(split.prev, context))
6969
}
70+
71+
override protected def getOutputDeterministicLevel = {
72+
if (prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
73+
DeterministicLevel.INDETERMINATE
74+
} else {
75+
super.getOutputDeterministicLevel
76+
}
77+
}
7078
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ abstract class RDD[T: ClassTag](
555555
val sampler = new BernoulliCellSampler[T](lb, ub)
556556
sampler.setSeed(seed + index)
557557
sampler.sample(partition)
558-
}, preservesPartitioning = true)
558+
}, isOrderSensitive = true, preservesPartitioning = true)
559559
}
560560

561561
/**
@@ -868,6 +868,29 @@ abstract class RDD[T: ClassTag](
868868
preservesPartitioning)
869869
}
870870

871+
/**
872+
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
873+
* of the original partition.
874+
*
875+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
876+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
877+
*
878+
* `isOrderSensitive` indicates whether the function is order-sensitive. If it is order
879+
* sensitive, it may return totally different result when the input order
880+
* is changed. Mostly stateful functions are order-sensitive.
881+
*/
882+
private[spark] def mapPartitionsWithIndex[U: ClassTag](
883+
f: (Int, Iterator[T]) => Iterator[U],
884+
preservesPartitioning: Boolean,
885+
isOrderSensitive: Boolean): RDD[U] = withScope {
886+
val cleanedF = sc.clean(f)
887+
new MapPartitionsRDD(
888+
this,
889+
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
890+
preservesPartitioning,
891+
isOrderSensitive = isOrderSensitive)
892+
}
893+
871894
/**
872895
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
873896
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2783,6 +2783,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27832783
.contains("Spark cannot rollback the ShuffleMapStage 1"))
27842784
}
27852785

2786+
test("SPARK-29042: Sampled RDD with unordered input should be indeterminate") {
2787+
val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = false)
2788+
2789+
val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
2790+
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
2791+
2792+
assert(shuffleMapRdd2.outputDeterministicLevel == DeterministicLevel.UNORDERED)
2793+
2794+
val sampledRdd = shuffleMapRdd2.sample(true, 0.3, 1000L)
2795+
assert(sampledRdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE)
2796+
}
2797+
27862798
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
27872799
val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
27882800
val shuffleId = shuffleDep.shuffleId

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,16 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
103103
(part, (e.srcId, e.dstId, e.attr))
104104
}
105105
.partitionBy(new HashPartitioner(numPartitions))
106-
.mapPartitionsWithIndex( { (pid, iter) =>
107-
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
108-
iter.foreach { message =>
109-
val data = message._2
110-
builder.add(data._1, data._2, data._3)
111-
}
112-
val edgePartition = builder.toEdgePartition
113-
Iterator((pid, edgePartition))
114-
}, preservesPartitioning = true)).cache()
106+
.mapPartitionsWithIndex(
107+
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
108+
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
109+
iter.foreach { message =>
110+
val data = message._2
111+
builder.add(data._1, data._2, data._3)
112+
}
113+
val edgePartition = builder.toEdgePartition
114+
Iterator((pid, edgePartition))
115+
}, preservesPartitioning = true)).cache()
115116
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
116117
}
117118

0 commit comments

Comments
 (0)