@@ -420,6 +420,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
420420 /**
421421 * Group the values for each key in the RDD into a single sequence. Allows controlling the
422422 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
423+ * The ordering of elements within each group is not guaranteed, and may even differ
424+ * each time the resulting RDD is evaluated.
423425 *
424426 * Note: This operation may be very expensive. If you are grouping in order to perform an
425427 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey ]]
@@ -439,7 +441,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
439441
440442 /**
441443 * Group the values for each key in the RDD into a single sequence. Hash-partitions the
442- * resulting RDD with into `numPartitions` partitions.
444+ * resulting RDD with into `numPartitions` partitions. The ordering of elements within
445+ * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
443446 *
444447 * Note: This operation may be very expensive. If you are grouping in order to perform an
445448 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey ]]
@@ -535,7 +538,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535538
536539 /**
537540 * Group the values for each key in the RDD into a single sequence. Hash-partitions the
538- * resulting RDD with the existing partitioner/parallelism level.
541+ * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
542+ * within each group is not guaranteed, and may even differ each time the resulting RDD is
543+ * evaluated.
539544 *
540545 * Note: This operation may be very expensive. If you are grouping in order to perform an
541546 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey ]]
@@ -951,9 +956,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
951956 val writeShard = (context : TaskContext , iter : Iterator [(K ,V )]) => {
952957 // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
953958 // around by taking a mod. We expect that no task will be attempted 2 billion times.
954- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
959+ val attemptNumber = (context.getAttemptId % Int .MaxValue ).toInt
955960 /* "reduce task" <split #> <attempt # = spark task #> */
956- val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false , context.partitionId ,
961+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false , context.getPartitionId ,
957962 attemptNumber)
958963 val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
959964 val format = outfmt.newInstance
@@ -1022,9 +1027,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10221027 val writeToFile = (context : TaskContext , iter : Iterator [(K , V )]) => {
10231028 // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
10241029 // around by taking a mod. We expect that no task will be attempted 2 billion times.
1025- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
1030+ val attemptNumber = (context.getAttemptId % Int .MaxValue ).toInt
10261031
1027- writer.setup(context.stageId , context.partitionId , attemptNumber)
1032+ writer.setup(context.getStageId , context.getPartitionId , attemptNumber)
10281033 writer.open()
10291034 try {
10301035 var count = 0
0 commit comments