Skip to content

Commit 8810a79

Browse files
committed
[SPARK-17841][STREAMING][KAFKA] drain commitQueue
1 parent 8a6bbe0 commit 8810a79

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
263263

264264
protected def commitAll(): Unit = {
265265
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
266-
val it = commitQueue.iterator()
267-
while (it.hasNext) {
268-
val osr = it.next
266+
var osr = commitQueue.poll()
267+
while (null != osr) {
269268
val tp = osr.topicPartition
270269
val x = m.get(tp)
271270
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
272271
m.put(tp, new OffsetAndMetadata(offset))
272+
osr = commitQueue.poll()
273273
}
274274
if (!m.isEmpty) {
275275
consumer.commitAsync(m, commitCallback.get)

0 commit comments

Comments
 (0)