From 8810a79fa40cd777b5d68de1fed62e7ff8f9b9e7 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sun, 9 Oct 2016 12:56:37 -0500 Subject: [PATCH] [SPARK-17841][STREAMING][KAFKA] drain commitQueue --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 13827f68f2cb..40e0180fafbc 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -263,13 +263,13 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected def commitAll(): Unit = { val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() - val it = commitQueue.iterator() - while (it.hasNext) { - val osr = it.next + var osr = commitQueue.poll() + while (null != osr) { val tp = osr.topicPartition val x = m.get(tp) val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } m.put(tp, new OffsetAndMetadata(offset)) + osr = commitQueue.poll() } if (!m.isEmpty) { consumer.commitAsync(m, commitCallback.get)