File tree Expand file tree Collapse file tree 2 files changed +22
-0
lines changed
core/src/main/scala/org/apache/spark/rdd
external/kafka/src/main/scala/org/apache/spark/rdd/kafka Expand file tree Collapse file tree 2 files changed +22
-0
lines changed Original file line number Diff line number Diff line change @@ -787,6 +787,20 @@ abstract class RDD[T: ClassTag](
787787 sc.runJob(this , (iter : Iterator [T ]) => cleanF(iter))
788788 }
789789
790+ /**
791+ * Applies a function to each partition of this RDD, while tracking the index
792+ * of the original partition.
793+ */
794+ def foreachPartitionWithIndex (
795+ f : (Int , Iterator [T ]) => Unit ) {
796+ val func = (index : Int , iter : Iterator [T ]) => {
797+ f(index, iter)
798+ Iterator .empty
799+ }
800+ sc.runJob(
801+ mapPartitionsWithIndex(func, true ), (iter : Iterator [T ]) => ())
802+ }
803+
790804 /**
791805 * Return an array that contains all of the elements in this RDD.
792806 */
Original file line number Diff line number Diff line change @@ -128,11 +128,19 @@ class KafkaRDD[
128128 .dropWhile(_.offset < requestOffset)
129129 }
130130 if (! iter.hasNext) {
131+ assert(requestOffset == part.untilOffset,
132+ s " ran out of messages before reaching ending offset ${part.untilOffset} " +
133+ s " for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. " +
134+ " This should not happen, and indicates that messages may have been lost" )
131135 finished = true
132136 null .asInstanceOf [R ]
133137 } else {
134138 val item = iter.next
135139 if (item.offset >= part.untilOffset) {
140+ assert(item.offset == part.untilOffset,
141+ s " got ${item.offset} > ending offset ${part.untilOffset} " +
142+ s " for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}. " +
143+ " This should not happen, and indicates a message may have been skipped" )
136144 finished = true
137145 null .asInstanceOf [R ]
138146 } else {
You can’t perform that action at this time.
0 commit comments