Skip to content

Commit f6b8793

Browse files
koeningerrxin
authored andcommitted
[SPARK-17841][STREAMING][KAFKA] drain commitQueue
## What changes were proposed in this pull request? Actually drain commit queue rather than just iterating it. iterator() on a concurrent linked queue won't remove items from the queue, poll() will. ## How was this patch tested? Unit tests Author: cody koeninger <[email protected]> Closes #15407 from koeninger/SPARK-17841. (cherry picked from commit cd106b0) Signed-off-by: Reynold Xin <[email protected]>
1 parent 6ef9231 commit f6b8793

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
282282

283283
protected def commitAll(): Unit = {
284284
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
285-
val it = commitQueue.iterator()
286-
while (it.hasNext) {
287-
val osr = it.next
285+
var osr = commitQueue.poll()
286+
while (null != osr) {
288287
val tp = osr.topicPartition
289288
val x = m.get(tp)
290289
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
291290
m.put(tp, new OffsetAndMetadata(offset))
291+
osr = commitQueue.poll()
292292
}
293293
if (!m.isEmpty) {
294294
consumer.commitAsync(m, commitCallback.get)

0 commit comments

Comments
 (0)