@@ -44,6 +44,9 @@ private[kafka010] case class CachedKafkaConsumer private(
4444
4545 private var consumer = createConsumer
4646
47+ /** indicates whether this consumer is in use or not */
48+ private var inuse = true
49+
4750 /** Iterator to the already fetch data */
4851 private var fetchedData = ju.Collections .emptyIterator[ConsumerRecord [Array [Byte ], Array [Byte ]]]
4952 private var nextOffsetInFetchedData = UNKNOWN_OFFSET
@@ -57,6 +60,20 @@ private[kafka010] case class CachedKafkaConsumer private(
5760 c
5861 }
5962
63+ case class AvailableOffsetRange (earliest : Long , latest : Long )
64+
65+ /**
66+ * Return the available offset range of the current partition. It's a pair of the earliest offset
67+ * and the latest offset.
68+ */
69+ def getAvailableOffsetRange (): AvailableOffsetRange = {
70+ consumer.seekToBeginning(Set (topicPartition).asJava)
71+ val earliestOffset = consumer.position(topicPartition)
72+ consumer.seekToEnd(Set (topicPartition).asJava)
73+ val latestOffset = consumer.position(topicPartition)
74+ AvailableOffsetRange (earliestOffset, latestOffset)
75+ }
76+
6077 /**
6178 * Get the record for the given offset if available. Otherwise it will either throw error
6279 * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
@@ -107,9 +124,9 @@ private[kafka010] case class CachedKafkaConsumer private(
107124 * `UNKNOWN_OFFSET`.
108125 */
109126 private def getEarliestAvailableOffsetBetween (offset : Long , untilOffset : Long ): Long = {
110- val (earliestOffset, latestOffset) = getAvailableOffsetRange()
111- logWarning(s " Some data may be lost. Recovering from the earliest offset: $earliestOffset " )
112- if (offset >= latestOffset || earliestOffset >= untilOffset) {
127+ val range = getAvailableOffsetRange()
128+ logWarning(s " Some data may be lost. Recovering from the earliest offset: ${range.earliest} " )
129+ if (offset >= range.latest || range.earliest >= untilOffset) {
113130 // [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap,
114131 // either
115132 // --------------------------------------------------------
@@ -124,13 +141,13 @@ private[kafka010] case class CachedKafkaConsumer private(
124141 // offset untilOffset earliestOffset latestOffset
125142 val warningMessage =
126143 s """
127- |The current available offset range is [ $earliestOffset , $latestOffset ) .
144+ |The current available offset range is $range .
128145 | Offset ${offset} is out of range, and records in [ $offset, $untilOffset) will be
129146 | skipped ${additionalMessage(failOnDataLoss = false )}
130147 """ .stripMargin
131148 logWarning(warningMessage)
132149 UNKNOWN_OFFSET
133- } else if (offset >= earliestOffset ) {
150+ } else if (offset >= range.earliest ) {
134151 // -----------------------------------------------------------------------------
135152 // ^ ^ ^ ^
136153 // | | | |
@@ -149,12 +166,12 @@ private[kafka010] case class CachedKafkaConsumer private(
149166 // offset earliestOffset min(untilOffset,latestOffset) max(untilOffset, latestOffset)
150167 val warningMessage =
151168 s """
152- |The current available offset range is [ $earliestOffset , $latestOffset ) .
153- | Offset ${offset} is out of range, and records in [ $offset, $earliestOffset ) will be
169+ |The current available offset range is $range .
170+ | Offset ${offset} is out of range, and records in [ $offset, ${range.earliest} ) will be
154171 | skipped ${additionalMessage(failOnDataLoss = false )}
155172 """ .stripMargin
156173 logWarning(warningMessage)
157- earliestOffset
174+ range.earliest
158175 }
159176 }
160177
@@ -183,8 +200,8 @@ private[kafka010] case class CachedKafkaConsumer private(
183200 // - `offset` is out of range so that Kafka returns nothing. Just throw
184201 // `OffsetOutOfRangeException` to let the caller handle it.
185202 // - Cannot fetch any data before timeout. TimeoutException will be thrown.
186- val (earliestOffset, latestOffset) = getAvailableOffsetRange()
187- if (offset < earliestOffset || offset >= latestOffset ) {
203+ val range = getAvailableOffsetRange()
204+ if (offset < range.earliest || offset >= range.latest ) {
188205 throw new OffsetOutOfRangeException (
189206 Map (topicPartition -> java.lang.Long .valueOf(offset)).asJava)
190207 } else {
@@ -284,18 +301,6 @@ private[kafka010] case class CachedKafkaConsumer private(
284301 logDebug(s " Polled $groupId ${p.partitions()} ${r.size}" )
285302 fetchedData = r.iterator
286303 }
287-
288- /**
289- * Return the available offset range of the current partition. It's a pair of the earliest offset
290- * and the latest offset.
291- */
292- private def getAvailableOffsetRange (): (Long , Long ) = {
293- consumer.seekToBeginning(Set (topicPartition).asJava)
294- val earliestOffset = consumer.position(topicPartition)
295- consumer.seekToEnd(Set (topicPartition).asJava)
296- val latestOffset = consumer.position(topicPartition)
297- (earliestOffset, latestOffset)
298- }
299304}
300305
301306private [kafka010] object CachedKafkaConsumer extends Logging {
@@ -310,7 +315,7 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
310315 new ju.LinkedHashMap [CacheKey , CachedKafkaConsumer ](capacity, 0.75f , true ) {
311316 override def removeEldestEntry (
312317 entry : ju.Map .Entry [CacheKey , CachedKafkaConsumer ]): Boolean = {
313- if (this .size > capacity) {
318+ if (entry.getValue.inuse == false && this .size > capacity) {
314319 logWarning(s " KafkaConsumer cache hitting max capacity of $capacity, " +
315320 s " removing consumer for ${entry.getKey}" )
316321 try {
@@ -327,6 +332,43 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
327332 }
328333 }
329334
335+ def releaseKafkaConsumer (
336+ topic : String ,
337+ partition : Int ,
338+ kafkaParams : ju.Map [String , Object ]): Unit = {
339+ val groupId = kafkaParams.get(ConsumerConfig .GROUP_ID_CONFIG ).asInstanceOf [String ]
340+ val topicPartition = new TopicPartition (topic, partition)
341+ val key = CacheKey (groupId, topicPartition)
342+
343+ synchronized {
344+ val consumer = cache.get(key)
345+ if (consumer != null ) {
346+ consumer.inuse = false
347+ } else {
348+ logWarning(s " Attempting to release consumer that does not exist " )
349+ }
350+ }
351+ }
352+
353+ /**
354+ * Removes (and closes) the Kafka Consumer for the given topic, partition and group id.
355+ */
356+ def removeKafkaConsumer (
357+ topic : String ,
358+ partition : Int ,
359+ kafkaParams : ju.Map [String , Object ]): Unit = {
360+ val groupId = kafkaParams.get(ConsumerConfig .GROUP_ID_CONFIG ).asInstanceOf [String ]
361+ val topicPartition = new TopicPartition (topic, partition)
362+ val key = CacheKey (groupId, topicPartition)
363+
364+ synchronized {
365+ val removedConsumer = cache.remove(key)
366+ if (removedConsumer != null ) {
367+ removedConsumer.close()
368+ }
369+ }
370+ }
371+
330372 /**
331373 * Get a cached consumer for groupId, assigned to topic and partition.
332374 * If matching consumer doesn't already exist, will be created using kafkaParams.
@@ -342,16 +384,18 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
342384 // If this is reattempt at running the task, then invalidate cache and start with
343385 // a new consumer
344386 if (TaskContext .get != null && TaskContext .get.attemptNumber > 1 ) {
345- val removedConsumer = cache.remove(key )
346- if (removedConsumer != null ) {
347- removedConsumer.close()
348- }
349- new CachedKafkaConsumer (topicPartition, kafkaParams)
387+ removeKafkaConsumer(topic, partition, kafkaParams )
388+ val consumer = new CachedKafkaConsumer (topicPartition, kafkaParams)
389+ consumer.inuse = true
390+ cache.put(key, consumer)
391+ consumer
350392 } else {
351393 if (! cache.containsKey(key)) {
352394 cache.put(key, new CachedKafkaConsumer (topicPartition, kafkaParams))
353395 }
354- cache.get(key)
396+ val consumer = cache.get(key)
397+ consumer.inuse = true
398+ consumer
355399 }
356400 }
357401}
0 commit comments