@@ -51,8 +51,6 @@ private[kafka010] case class CachedKafkaProducer(
5151 closed = false
5252 producer
5353 }
54- @ volatile
55- private var isCached : Boolean = true
5654 @ GuardedBy (" this" )
5755 private var closed : Boolean = true
5856 private def close (): Unit = {
@@ -72,8 +70,6 @@ private[kafka010] case class CachedKafkaProducer(
7270
7371 private def inUse (): Boolean = inUseCount.get() > 0
7472
75- private def unCache (): Unit = isCached = false
76-
7773 private [kafka010] def getInUseCount : Int = inUseCount.get()
7874
7975 private [kafka010] def getKafkaParams : Seq [(String , Object )] = kafkaParams
@@ -104,13 +100,13 @@ private[kafka010] object CachedKafkaProducer extends Logging {
104100 override def onRemoval (
105101 notification : RemovalNotification [Seq [(String , Object )], CachedKafkaProducer ]): Unit = {
106102 val producer : CachedKafkaProducer = notification.getValue
103+ logDebug(s " Evicting kafka producer $producer, due to ${notification.getCause}. " )
107104 if (producer.inUse()) {
108- logDebug( s " Evicting kafka producer $ producer, due to ${notification.getCause} . " )
109- // When `inuse` producer is evicted we wait for it to be released before finally closing it.
105+ // When `inuse` producer is evicted we wait for it to be released by all the tasks,
106+ // before finally closing it.
110107 closeQueue.add(producer)
111- producer.unCache()
112108 } else {
113- close(producer )
109+ producer. close()
114110 }
115111 }
116112 }
@@ -168,15 +164,12 @@ private[kafka010] object CachedKafkaProducer extends Logging {
168164 }
169165 }
170166 }
171- if (! producer.inUse() && ! producer.isCached) {
172- // it will take care of removing it from close queue as well.
173- close(producer)
174- }
167+ // We need a close queue, so that we can close the producer(s) outside of a synchronized block.
168+ processPendingClose(producer)
175169 }
176170
177- /** Close this producer and process pending closes. */
178- private def close (producer : CachedKafkaProducer ): Unit = {
179- producer.close()
171+ /** Process pending closes. */
172+ private def processPendingClose (producer : CachedKafkaProducer ): Unit = {
180173 // Check and close any other producers previously evicted, but pending to be closed.
181174 for (p <- closeQueue.iterator().asScala) {
182175 if (! p.inUse()) {
0 commit comments