From 4cdce48e585e28c2ea6feae66f3b035687c6b517 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 8 Sep 2017 17:49:16 +0530 Subject: [PATCH 01/18] [SPARK-21869][SS] A cached Kafka producer should not be closed if any task is using it. --- .../sql/kafka010/CachedKafkaProducer.scala | 95 +++++++++++++------ .../spark/sql/kafka010/KafkaWriteTask.scala | 14 +-- .../spark/sql/kafka010/KafkaWriter.scala | 2 +- .../kafka010/CachedKafkaProducerSuite.scala | 56 ++++++----- 4 files changed, 104 insertions(+), 63 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index f24001f4ae3a..057092450187 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} +import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} @@ -29,9 +30,23 @@ import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -private[kafka010] object CachedKafkaProducer extends Logging { +private[kafka010] case class CachedKafkaProducer(id: String, inUseCount: AtomicInteger, + kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]]) extends Logging { + private var closed: Boolean = false + private def close(): Unit = this.synchronized { + if (!closed) { + closed = true + kafkaProducer.close() + logInfo(s"Closed kafka producer: $kafkaProducer") + } + } + private[kafka010] def flush(): Unit = { + kafkaProducer.flush() + } + private[kafka010] def isClosed: Boolean = closed +} - private type Producer = KafkaProducer[Array[Byte], Array[Byte]] +private[kafka010] object CachedKafkaProducer extends Logging { private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) @@ -40,37 +55,45 @@ private[kafka010] object CachedKafkaProducer extends Logging { "spark.kafka.producer.cache.timeout", s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) - private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { - override def load(config: Seq[(String, Object)]): Producer = { + private val cacheLoader = new CacheLoader[Seq[(String, Object)], CachedKafkaProducer] { + override def load(config: Seq[(String, Object)]): CachedKafkaProducer = { val configMap = config.map(x => x._1 -> x._2).toMap.asJava createKafkaProducer(configMap) } } - private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { + private val closeQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() + + private val removalListener = new RemovalListener[Seq[(String, Object)], CachedKafkaProducer]() { override def onRemoval( - notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { - val paramsSeq: Seq[(String, Object)] = notification.getKey - val producer: Producer = notification.getValue - logDebug( - s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}") - close(paramsSeq, producer) + notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = { + val producer: CachedKafkaProducer = notification.getValue + logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}") + if (producer.inUseCount.intValue() > 0) { + // When a inuse producer is evicted we wait for it to be released before finally closing it. + closeQueue.add(producer) + } else { + close(producer) + } } } - private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] = + private lazy val guavaCache: LoadingCache[Seq[(String, Object)], CachedKafkaProducer] = CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) .removalListener(removalListener) - .build[Seq[(String, Object)], Producer](cacheLoader) + .build[Seq[(String, Object)], CachedKafkaProducer](cacheLoader) - private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = { + private def createKafkaProducer(producerConfig: ju.Map[String, Object]): CachedKafkaProducer = { val updatedKafkaProducerConfiguration = - KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap) + KafkaConfigUpdater("executor", producerConfig.asScala.toMap) .setAuthenticationConfigIfNeeded() .build() - val kafkaProducer: Producer = new Producer(updatedKafkaProducerConfiguration) - logDebug(s"Created a new instance of KafkaProducer for $updatedKafkaProducerConfiguration.") - kafkaProducer + val kafkaProducer = + new KafkaProducer[Array[Byte], Array[Byte]](updatedKafkaProducerConfiguration) + val id: String = ju.UUID.randomUUID().toString + logDebug(s"Created a new instance of KafkaProducer for " + + s"$updatedKafkaProducerConfiguration with Id: $id") + CachedKafkaProducer(id, new AtomicInteger(0), kafkaProducer) } /** @@ -78,10 +101,13 @@ private[kafka010] object CachedKafkaProducer extends Logging { * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { + private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams) try { - guavaCache.get(paramsSeq) + val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) + val useCount: Int = cachedKafkaProducer.inUseCount.incrementAndGet() + logDebug(s"Granted producer $cachedKafkaProducer, inuse-count: $useCount") + cachedKafkaProducer } catch { case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) if e.getCause != null => @@ -94,27 +120,38 @@ private[kafka010] object CachedKafkaProducer extends Logging { paramsSeq } - /** For explicitly closing kafka producer */ + /** For explicitly closing kafka producer, will not close an inuse producer until released. */ private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = { val paramsSeq = paramsToSeq(kafkaParams) guavaCache.invalidate(paramsSeq) } - /** Auto close on cache evict */ - private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = { + /** Close this producer and process pending closes. */ + private def close(producer: CachedKafkaProducer): Unit = synchronized { try { - logInfo(s"Closing the KafkaProducer with params: ${paramsSeq.mkString("\n")}.") producer.close() + // Check and close any producers evicted, and pending to be closed. + for (p <- closeQueue.iterator().asScala) { + if (p.inUseCount.intValue() <= 0) { + producer.close() + closeQueue.remove(p) + } + } } catch { - case NonFatal(e) => logWarning("Error while closing kafka producer.", e) + case NonFatal(e) => logWarning(s"Error while closing kafka producer: $producer", e) } } + // Intended for testing purpose only. private[kafka010] def clear(): Unit = { - logInfo("Cleaning up guava cache.") + logInfo("Cleaning up guava cache and force closing all kafka producer.") guavaCache.invalidateAll() + for (p <- closeQueue.iterator().asScala) { + p.close() + } + closeQueue.clear() } - // Intended for testing purpose only. - private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap() + private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] = + guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index 041fac771763..a92e1d86fa30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -35,13 +35,12 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ + private val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(producerConfiguration) /** * Writes key value data out to topics. */ def execute(iterator: Iterator[InternalRow]): Unit = { - producer = CachedKafkaProducer.getOrCreate(producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() sendRow(currentRow, producer) @@ -49,12 +48,9 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { + producer.inUseCount.decrementAndGet() + producer.kafkaProducer.flush() checkForErrors() - if (producer != null) { - producer.flush() - checkForErrors() - producer = null - } } } @@ -79,7 +75,7 @@ private[kafka010] abstract class KafkaRowWriter( * assuming the row is in Kafka. */ protected def sendRow( - row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = { + row: InternalRow, producer: CachedKafkaProducer): Unit = { val projectedRow = projection(row) val topic = projectedRow.getUTF8String(0) val key = projectedRow.getBinary(1) @@ -89,7 +85,7 @@ private[kafka010] abstract class KafkaRowWriter( s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) - producer.send(record, callback) + producer.kafkaProducer.send(record, callback) } protected def checkForErrors(): Unit = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index e1a9191cc5a8..1317509214e5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -22,7 +22,7 @@ import java.{util => ju} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types.{BinaryType, StringType} import org.apache.spark.util.Utils diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 0b3355426df1..0dc93bd1435b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.ConcurrentMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.ByteArraySerializer @@ -36,42 +35,51 @@ class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester } test("Should return the cached instance on calling getOrCreate with same params.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") - // Here only host should be resolvable, it does not need a running instance of kafka server. - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) + val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer = CachedKafkaProducer.getOrCreate(kafkaParams) val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams) - assert(producer == producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) + assert(producer.kafkaProducer == producer2.kafkaProducer) + assert(producer.inUseCount.intValue() == 2) + val map = CachedKafkaProducer.getAsMap assert(map.size == 1) } test("Should close the correct kafka producer for the given kafkaPrams.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams) + val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams + val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) kafkaParams.put("acks", "1") - val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams) + val producer2: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) // With updated conf, a new producer instance should be created. - assert(producer != producer2) + assert(producer.kafkaProducer != producer2.kafkaProducer) - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) + val map = CachedKafkaProducer.getAsMap assert(map.size == 2) + producer2.inUseCount.decrementAndGet() CachedKafkaProducer.close(kafkaParams) - val map2 = CachedKafkaProducer.invokePrivate(cacheMap()) + assert(producer2.isClosed) + val map2 = CachedKafkaProducer.getAsMap assert(map2.size == 1) import scala.collection.JavaConverters._ - val (seq: Seq[(String, Object)], _producer: KP) = map2.asScala.toArray.apply(0) - assert(_producer == producer) + val (seq: Seq[(String, Object)], _producer: CachedKafkaProducer) = map2.asScala.toArray.apply(0) + assert(_producer.kafkaProducer == producer.kafkaProducer) + } + + test("Should not close a producer in-use.") { + val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams + val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) + assert(producer.inUseCount.intValue() > 0) + CachedKafkaProducer.close(kafkaParams) + assert(producer.inUseCount.intValue() > 0) + assert(!producer.isClosed, "An in-use producer should not be closed.") + } + + private def generateKafkaParams: ju.HashMap[String, Object] = { + val kafkaParams = new ju.HashMap[String, Object]() + kafkaParams.put("acks", "0") + kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") + kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) + kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) + kafkaParams } } From 78a5422b80471148045ef6f24d6bc837e05ca2da Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 17 Jan 2018 16:12:37 +0530 Subject: [PATCH 02/18] Merged with upstream changes. --- .../org/apache/spark/sql/kafka010/CachedKafkaProducer.scala | 2 ++ .../org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala | 1 + .../scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 057092450187..600670e40c81 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -40,9 +40,11 @@ private[kafka010] case class CachedKafkaProducer(id: String, inUseCount: AtomicI logInfo(s"Closed kafka producer: $kafkaProducer") } } + private[kafka010] def flush(): Unit = { kafkaProducer.flush() } + private[kafka010] def isClosed: Boolean = closed } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index e3101e157208..204ec990f707 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -115,6 +115,7 @@ class KafkaStreamDataWriter( checkForErrors() if (producer != null) { producer.flush() + producer.inUseCount.decrementAndGet() checkForErrors() CachedKafkaProducer.close(producerParams) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index a92e1d86fa30..97f048948e0f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -49,7 +49,7 @@ private[kafka010] class KafkaWriteTask( def close(): Unit = { producer.inUseCount.decrementAndGet() - producer.kafkaProducer.flush() + producer.flush() checkForErrors() } } From 4d57a820d55756fe5d97362e7e5449cc37226ce6 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 7 Mar 2019 18:56:46 +0530 Subject: [PATCH 03/18] Added more tests and improved overall stability. --- .../sql/kafka010/CachedKafkaProducer.scala | 134 +++++++++++------- .../sql/kafka010/KafkaStreamingWrite.scala | 11 +- .../spark/sql/kafka010/KafkaWriteTask.scala | 14 +- .../kafka010/CachedKafkaProducerSuite.scala | 74 ++++++---- 4 files changed, 148 insertions(+), 85 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 600670e40c81..cfc21623dbb3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -21,26 +21,57 @@ import java.{util => ju} import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.kafka.clients.producer.KafkaProducer -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -private[kafka010] case class CachedKafkaProducer(id: String, inUseCount: AtomicInteger, - kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]]) extends Logging { - private var closed: Boolean = false - private def close(): Unit = this.synchronized { - if (!closed) { - closed = true - kafkaProducer.close() - logInfo(s"Closed kafka producer: $kafkaProducer") +private[kafka010] case class CachedKafkaProducer( + private val id: String = ju.UUID.randomUUID().toString, + private val inUseCount: AtomicInteger = new AtomicInteger(0), + private val kafkaParams: Seq[(String, Object)]) extends Logging { + + private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava + + lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + val producer = new KafkaProducer[Array[Byte], Array[Byte]](configMap) + logDebug(s"Created a new instance of KafkaProducer for " + + s"$kafkaParams with Id: $id") + closed = false + producer + } + private var isCached: Boolean = true + private var closed: Boolean = true + private def close(): Unit = { + try { + this.synchronized { + if (!closed) { + closed = true + kafkaProducer.close() + logInfo(s"Closed kafka producer: $kafkaProducer") + } + } + } catch { + case NonFatal(e) => + logWarning(s"Error while closing kafka producer with params: $kafkaParams", e) } } + private def inUse(): Boolean = { + inUseCount.get() > 0 + } + private def unCache(): Unit = { + isCached = false + } + private[kafka010] def getInUseCount: Int = inUseCount.get() + + private[kafka010] def getKafkaParams: Seq[(String, Object)] = kafkaParams + private[kafka010] def flush(): Unit = { kafkaProducer.flush() } @@ -54,13 +85,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get).map(_.conf.getTimeAsMs( - "spark.kafka.producer.cache.timeout", - s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) + key = "spark.kafka.producer.cache.timeout", + defaultValue = s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) private val cacheLoader = new CacheLoader[Seq[(String, Object)], CachedKafkaProducer] { - override def load(config: Seq[(String, Object)]): CachedKafkaProducer = { - val configMap = config.map(x => x._1 -> x._2).toMap.asJava - createKafkaProducer(configMap) + override def load(params: Seq[(String, Object)]): CachedKafkaProducer = { + CachedKafkaProducer(kafkaParams = params) } } @@ -70,10 +100,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { override def onRemoval( notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = { val producer: CachedKafkaProducer = notification.getValue - logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}") - if (producer.inUseCount.intValue() > 0) { - // When a inuse producer is evicted we wait for it to be released before finally closing it. + if (producer.inUse()) { + logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}") + // When `inuse` producer is evicted we wait for it to be released before finally closing it. closeQueue.add(producer) + producer.unCache() } else { close(producer) } @@ -85,26 +116,18 @@ private[kafka010] object CachedKafkaProducer extends Logging { .removalListener(removalListener) .build[Seq[(String, Object)], CachedKafkaProducer](cacheLoader) - private def createKafkaProducer(producerConfig: ju.Map[String, Object]): CachedKafkaProducer = { - val updatedKafkaProducerConfiguration = - KafkaConfigUpdater("executor", producerConfig.asScala.toMap) - .setAuthenticationConfigIfNeeded() - .build() - val kafkaProducer = - new KafkaProducer[Array[Byte], Array[Byte]](updatedKafkaProducerConfiguration) - val id: String = ju.UUID.randomUUID().toString - logDebug(s"Created a new instance of KafkaProducer for " + - s"$updatedKafkaProducerConfiguration with Id: $id") - CachedKafkaProducer(id, new AtomicInteger(0), kafkaProducer) - } + private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = + KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() /** * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = { - val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams) + private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = { + val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap)) try { val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) val useCount: Int = cachedKafkaProducer.inUseCount.incrementAndGet() @@ -117,30 +140,41 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } - private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { - val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1) + private def paramsToSeq(kafkaParamsMap: ju.Map[String, Object]): Seq[(String, Object)] = { + val paramsSeq: Seq[(String, Object)] = kafkaParamsMap.asScala.toSeq.sortBy(x => x._1) paramsSeq } - /** For explicitly closing kafka producer, will not close an inuse producer until released. */ - private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = { - val paramsSeq = paramsToSeq(kafkaParams) - guavaCache.invalidate(paramsSeq) + /* Release a kafka producer back to the kafka cache. We simple decrement it's inuse count. */ + private[kafka010] def release(producer: CachedKafkaProducer, offending: Boolean): Boolean = { + if (producer != null) { + producer.inUseCount.decrementAndGet() + logDebug(s"Released producer $producer, updated inuse count: ${producer.getInUseCount}") + if (offending) { + // If this producer is failing to write(i.e. offending), we send it to close queue. + // So that it is re-created, eventually. + logDebug(s"Invalidated an offending producer:$producer.") + guavaCache.invalidate(producer.kafkaParams) + } + if (!producer.inUse() && !producer.isCached) { + // it will take care of removing it from close queue as well. + close(producer) + } + true + } else { + false + } } /** Close this producer and process pending closes. */ private def close(producer: CachedKafkaProducer): Unit = synchronized { - try { - producer.close() - // Check and close any producers evicted, and pending to be closed. - for (p <- closeQueue.iterator().asScala) { - if (p.inUseCount.intValue() <= 0) { - producer.close() - closeQueue.remove(p) - } + producer.close() + // Check and close any other producers previously evicted, but pending to be closed. + for (p <- closeQueue.iterator().asScala) { + if (!p.inUse()) { + closeQueue.remove(p) + p.close() } - } catch { - case NonFatal(e) => logWarning(s"Error while closing kafka producer: $producer", e) } } @@ -154,6 +188,10 @@ private[kafka010] object CachedKafkaProducer extends Logging { closeQueue.clear() } + private[kafka010] def evict(params: Seq[(String, Object)]): Unit = { + guavaCache.invalidate(params) + } + private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] = guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 204ec990f707..4b9ff3b4dadf 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -92,7 +92,7 @@ class KafkaStreamDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams) + protected lazy val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -112,12 +112,9 @@ class KafkaStreamDataWriter( def abort(): Unit = {} def close(): Unit = { + CachedKafkaProducer.release(producer, failedWrite != null) + checkForErrors() + producer.flush() checkForErrors() - if (producer != null) { - producer.flush() - producer.inUseCount.decrementAndGet() - checkForErrors() - CachedKafkaProducer.close(producerParams) - } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index 97f048948e0f..f68045deac8d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types.{BinaryType, StringType} @@ -35,7 +36,7 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - private val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(producerConfiguration) + protected val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerConfiguration) /** * Writes key value data out to topics. @@ -48,15 +49,17 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { - producer.inUseCount.decrementAndGet() + checkForErrors() producer.flush() checkForErrors() + CachedKafkaProducer.release(producer, offending = failedWrite != null) } } private[kafka010] abstract class KafkaRowWriter( - inputSchema: Seq[Attribute], topic: Option[String]) { + inputSchema: Seq[Attribute], topic: Option[String]) extends Logging { + protected val producer: CachedKafkaProducer // used to synchronize with Kafka callbacks @volatile protected var failedWrite: Exception = _ protected val projection = createProjection @@ -90,6 +93,9 @@ private[kafka010] abstract class KafkaRowWriter( protected def checkForErrors(): Unit = { if (failedWrite != null) { + // Before throwing exception, we should mark this acquired producer as not in use, + // for this particular task. Otherwise it will linger on, assuming that it is in use. + CachedKafkaProducer.release(producer, offending = true) throw failedWrite } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 0dc93bd1435b..b59e35308f0f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import scala.collection.JavaConverters._ + import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkException import org.apache.spark.sql.test.SharedSQLContext -class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest { +class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { type KP = KafkaProducer[Array[Byte], Array[Byte]] @@ -34,50 +36,70 @@ class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester CachedKafkaProducer.clear() } - test("Should return the cached instance on calling getOrCreate with same params.") { + test("Should return the cached instance on calling acquire with same params.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams - val producer = CachedKafkaProducer.getOrCreate(kafkaParams) - val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams) + val producer = CachedKafkaProducer.acquire(kafkaParams) + val producer2 = CachedKafkaProducer.acquire(kafkaParams) assert(producer.kafkaProducer == producer2.kafkaProducer) - assert(producer.inUseCount.intValue() == 2) + assert(producer.getInUseCount == 2) val map = CachedKafkaProducer.getAsMap assert(map.size == 1) } - test("Should close the correct kafka producer for the given kafkaPrams.") { + test("Should return the new instance on calling acquire with different params.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams - val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) - kafkaParams.put("acks", "1") - val producer2: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) - // With updated conf, a new producer instance should be created. + val producer = CachedKafkaProducer.acquire(kafkaParams) + kafkaParams.remove("ack") // mutate the kafka params. + val producer2 = CachedKafkaProducer.acquire(kafkaParams) assert(producer.kafkaProducer != producer2.kafkaProducer) - + assert(producer.getInUseCount == 1) + assert(producer2.getInUseCount == 1) val map = CachedKafkaProducer.getAsMap assert(map.size == 2) - producer2.inUseCount.decrementAndGet() + } - CachedKafkaProducer.close(kafkaParams) - assert(producer2.isClosed) - val map2 = CachedKafkaProducer.getAsMap - assert(map2.size == 1) - import scala.collection.JavaConverters._ - val (seq: Seq[(String, Object)], _producer: CachedKafkaProducer) = map2.asScala.toArray.apply(0) - assert(_producer.kafkaProducer == producer.kafkaProducer) - } + test("Should return the cached instance, even if auth tokens are set up.") { + // TODO. + // Question: What happens when a delegation token value is changed for a given producer? + // we would need to recreate a kafka producer. + } + + test("Remove an offending kafka producer from cache.") { + import testImplicits._ + val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") + intercept[SparkException] { + // This will fail because the service is not reachable. + df.write + .format("kafka") + .option("topic", "topic") + .option("retries", "1") + .option("max.block.ms", "2") + .option("request.timeout.ms", "2") + .option("linger.ms", "2") + .option("kafka.bootstrap.servers", "12.0.0.1:39022") + .save() + } + // Since offending kafka producer is released on error and also invalidated, it should not be in + // cache. + val map = CachedKafkaProducer.getAsMap + assert(map.size == 0) + } test("Should not close a producer in-use.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams - val producer: CachedKafkaProducer = CachedKafkaProducer.getOrCreate(kafkaParams) - assert(producer.inUseCount.intValue() > 0) - CachedKafkaProducer.close(kafkaParams) - assert(producer.inUseCount.intValue() > 0) + val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams) + producer.kafkaProducer // initializing the producer. + assert(producer.getInUseCount.intValue() > 0) + // Explicitly cause the producer from guava cache to be evicted. + CachedKafkaProducer.evict(producer.getKafkaParams) + assert(producer.getInUseCount.intValue() > 0) assert(!producer.isClosed, "An in-use producer should not be closed.") } private def generateKafkaParams: ju.HashMap[String, Object] = { val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") + kafkaParams.put("ack", "1") kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) kafkaParams From 0655e6bd907dd1814145b52e7bc5fd028fc01992 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 13 Mar 2019 13:56:50 +0530 Subject: [PATCH 04/18] Fixed the possibility of race condition using coarse grain locks. Ensured the close on each task is called, on task completion. --- .../sql/kafka010/CachedKafkaProducer.scala | 65 +++++++++---------- .../sql/kafka010/KafkaStreamingWrite.scala | 10 +-- .../spark/sql/kafka010/KafkaWriteTask.scala | 7 +- .../kafka010/CachedKafkaProducerSuite.scala | 22 ++----- .../kafka010/KafkaContinuousSinkSuite.scala | 18 ++--- 5 files changed, 55 insertions(+), 67 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index cfc21623dbb3..d094825d4b69 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -38,13 +38,19 @@ private[kafka010] case class CachedKafkaProducer( private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava + private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = + KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = { - val producer = new KafkaProducer[Array[Byte], Array[Byte]](configMap) + val producer = new KafkaProducer[Array[Byte], Array[Byte]](updatedAuthConfigIfNeeded(configMap)) logDebug(s"Created a new instance of KafkaProducer for " + s"$kafkaParams with Id: $id") closed = false producer } + @volatile private var isCached: Boolean = true private var closed: Boolean = true private def close(): Unit = { @@ -53,7 +59,7 @@ private[kafka010] case class CachedKafkaProducer( if (!closed) { closed = true kafkaProducer.close() - logInfo(s"Closed kafka producer: $kafkaProducer") + logInfo(s"Closed kafka producer: $this") } } } catch { @@ -85,8 +91,8 @@ private[kafka010] object CachedKafkaProducer extends Logging { private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get).map(_.conf.getTimeAsMs( - key = "spark.kafka.producer.cache.timeout", - defaultValue = s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) + "spark.kafka.producer.cache.timeout", + s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) private val cacheLoader = new CacheLoader[Seq[(String, Object)], CachedKafkaProducer] { override def load(params: Seq[(String, Object)]): CachedKafkaProducer = { @@ -101,7 +107,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = { val producer: CachedKafkaProducer = notification.getValue if (producer.inUse()) { - logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}") + logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}.") // When `inuse` producer is evicted we wait for it to be released before finally closing it. closeQueue.add(producer) producer.unCache() @@ -116,29 +122,25 @@ private[kafka010] object CachedKafkaProducer extends Logging { .removalListener(removalListener) .build[Seq[(String, Object)], CachedKafkaProducer](cacheLoader) - private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = - KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) - .setAuthenticationConfigIfNeeded() - .build() - /** * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = { - val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap)) - try { - val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) - val useCount: Int = cachedKafkaProducer.inUseCount.incrementAndGet() - logDebug(s"Granted producer $cachedKafkaProducer, inuse-count: $useCount") - cachedKafkaProducer - } catch { - case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) - if e.getCause != null => - throw e.getCause + private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = + this.synchronized { + val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap) + try { + val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) + val useCount = cachedKafkaProducer.inUseCount.incrementAndGet() + logDebug(s"Granted producer $cachedKafkaProducer, inuse-count: $useCount") + cachedKafkaProducer + } catch { + case e@(_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) + if e.getCause != null => + throw e.getCause + } } - } private def paramsToSeq(kafkaParamsMap: ju.Map[String, Object]): Seq[(String, Object)] = { val paramsSeq: Seq[(String, Object)] = kafkaParamsMap.asScala.toSeq.sortBy(x => x._1) @@ -146,28 +148,25 @@ private[kafka010] object CachedKafkaProducer extends Logging { } /* Release a kafka producer back to the kafka cache. We simple decrement it's inuse count. */ - private[kafka010] def release(producer: CachedKafkaProducer, offending: Boolean): Boolean = { - if (producer != null) { - producer.inUseCount.decrementAndGet() - logDebug(s"Released producer $producer, updated inuse count: ${producer.getInUseCount}") - if (offending) { - // If this producer is failing to write(i.e. offending), we send it to close queue. + private[kafka010] def release(producer: CachedKafkaProducer, failing: Boolean): Unit = { + this.synchronized { + val inUseCount = producer.inUseCount.decrementAndGet() + logDebug(s"Released producer $producer, updated inuse count: $inUseCount") + if (failing) { + // If this producer is failing to write, we remove it from cache. // So that it is re-created, eventually. - logDebug(s"Invalidated an offending producer:$producer.") + logDebug(s"Invalidated a failing producer: $producer.") guavaCache.invalidate(producer.kafkaParams) } if (!producer.inUse() && !producer.isCached) { // it will take care of removing it from close queue as well. close(producer) } - true - } else { - false } } /** Close this producer and process pending closes. */ - private def close(producer: CachedKafkaProducer): Unit = synchronized { + private def close(producer: CachedKafkaProducer): Unit = { producer.close() // Check and close any other producers previously evicted, but pending to be closed. for (p <- closeQueue.iterator().asScala) { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 4b9ff3b4dadf..093bb1e9cd97 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -92,7 +92,7 @@ class KafkaStreamDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - protected lazy val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerParams) + protected val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -106,15 +106,15 @@ class KafkaStreamDataWriter( checkForErrors() producer.flush() checkForErrors() + close() KafkaWriterCommitMessage } - def abort(): Unit = {} + def abort(): Unit = { + close() + } def close(): Unit = { CachedKafkaProducer.release(producer, failedWrite != null) - checkForErrors() - producer.flush() - checkForErrors() } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index f68045deac8d..b0f0432a161b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -36,7 +36,8 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - protected val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerConfiguration) + protected val producer: CachedKafkaProducer = + CachedKafkaProducer.acquire(producerConfiguration) /** * Writes key value data out to topics. @@ -52,7 +53,7 @@ private[kafka010] class KafkaWriteTask( checkForErrors() producer.flush() checkForErrors() - CachedKafkaProducer.release(producer, offending = failedWrite != null) + CachedKafkaProducer.release(producer, failedWrite != null) } } @@ -95,7 +96,7 @@ private[kafka010] abstract class KafkaRowWriter( if (failedWrite != null) { // Before throwing exception, we should mark this acquired producer as not in use, // for this particular task. Otherwise it will linger on, assuming that it is in use. - CachedKafkaProducer.release(producer, offending = true) + CachedKafkaProducer.release(producer, failing = true) throw failedWrite } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index b59e35308f0f..72ff6d6ba71c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import scala.collection.JavaConverters._ - import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.ByteArraySerializer @@ -58,28 +56,22 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { assert(map.size == 2) } - test("Should return the cached instance, even if auth tokens are set up.") { - // TODO. - // Question: What happens when a delegation token value is changed for a given producer? - // we would need to recreate a kafka producer. - } - - test("Remove an offending kafka producer from cache.") { + test("Automatically remove a failing kafka producer from cache.") { import testImplicits._ val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") - intercept[SparkException] { + val ex = intercept[SparkException] { // This will fail because the service is not reachable. df.write .format("kafka") .option("topic", "topic") - .option("retries", "1") - .option("max.block.ms", "2") - .option("request.timeout.ms", "2") - .option("linger.ms", "2") + .option("kafka.retries", "1") + .option("kafka.max.block.ms", "2") .option("kafka.bootstrap.servers", "12.0.0.1:39022") .save() } - // Since offending kafka producer is released on error and also invalidated, it should not be in + assert(ex.getMessage.contains("org.apache.kafka.common.errors.TimeoutException"), + "Spark command should fail due to service not reachable.") + // Since failing kafka producer is released on error and also invalidated, it should not be in // cache. val map = CachedKafkaProducer.getAsMap assert(map.size == 0) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index b21037b1340c..c871d1c61d1c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -418,17 +418,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val inputSchema = Seq(AttributeReference("value", BinaryType)()) val data = new Array[Byte](15000) // large value val writeTask = new KafkaStreamDataWriter(Some(topic), options, inputSchema) - try { - val fieldTypes: Array[DataType] = Array(BinaryType) - val converter = UnsafeProjection.create(fieldTypes) - val row = new SpecificInternalRow(fieldTypes) - row.update(0, data) - val iter = Seq.fill(1000)(converter.apply(row)).iterator - iter.foreach(writeTask.write(_)) - writeTask.commit() - } finally { - writeTask.close() - } + val fieldTypes: Array[DataType] = Array(BinaryType) + val converter = UnsafeProjection.create(fieldTypes) + val row = new SpecificInternalRow(fieldTypes) + row.update(0, data) + val iter = Seq.fill(1000)(converter.apply(row)).iterator + iter.foreach(writeTask.write(_)) + writeTask.commit() } private def createKafkaReader(topic: String): DataFrame = { From 7a02de912d27106cb06dddd7a57717098571b9af Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 13 Mar 2019 14:07:32 +0530 Subject: [PATCH 05/18] Refactored the code, and incorporated review feedback. --- .../apache/spark/sql/kafka010/KafkaStreamingWrite.scala | 8 ++++---- .../spark/sql/kafka010/CachedKafkaProducerSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 093bb1e9cd97..09be6377d9fd 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -103,9 +103,6 @@ class KafkaStreamDataWriter( // Send is asynchronous, but we can't commit until all rows are actually in Kafka. // This requires flushing and then checking that no callbacks produced errors. // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() close() KafkaWriterCommitMessage } @@ -114,7 +111,10 @@ class KafkaStreamDataWriter( close() } - def close(): Unit = { + private def close(): Unit = { + checkForErrors() + producer.flush() + checkForErrors() CachedKafkaProducer.release(producer, failedWrite != null) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 72ff6d6ba71c..72e5c5d9f99a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -81,10 +81,10 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams) producer.kafkaProducer // initializing the producer. - assert(producer.getInUseCount.intValue() > 0) + assert(producer.getInUseCount.intValue() == 1) // Explicitly cause the producer from guava cache to be evicted. CachedKafkaProducer.evict(producer.getKafkaParams) - assert(producer.getInUseCount.intValue() > 0) + assert(producer.getInUseCount.intValue() == 1) assert(!producer.isClosed, "An in-use producer should not be closed.") } From c0f2ab3e205a88d6054aef8c75846e43260f31f3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 13 Mar 2019 16:44:54 +0530 Subject: [PATCH 06/18] Added a stress suite. --- .../sql/streaming/KafkaWriteTest.scala | 53 ++++++++++++++ .../kafka010/CachedKafkaProducerSuite.scala | 72 +++++++++++++++---- 2 files changed, 112 insertions(+), 13 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala new file mode 100644 index 000000000000..d3ede13dba8f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import java.util.UUID + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.streaming.Trigger + + +object KafkaWriteTest { + def main(args: Array[String]): Unit = { + + val spark = SparkSession + .builder + .appName("KafkaWriteTest") + .getOrCreate() + import spark.implicits._ + + // Create DataSet representing the stream of input lines from kafka + val lines = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "localhost:9092") + .option("subscribe", "test") + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + + val q = lines.writeStream.format("kafka") + .option("kafka.bootstrap.servers", "localhost:9092") + .option("topic", "test1").trigger(Trigger.ProcessingTime("2 second")) + .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString) + .start() + q.awaitTermination() + } + +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 72e5c5d9f99a..b14f4b83cc64 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,22 +18,16 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.util.concurrent.TimeUnit -import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.test.SharedSQLContext class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { - type KP = KafkaProducer[Array[Byte], Array[Byte]] - - protected override def beforeEach(): Unit = { - super.beforeEach() - CachedKafkaProducer.clear() - } - test("Should return the cached instance on calling acquire with same params.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer = CachedKafkaProducer.acquire(kafkaParams) @@ -81,19 +75,71 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams) producer.kafkaProducer // initializing the producer. - assert(producer.getInUseCount.intValue() == 1) + assert(producer.getInUseCount == 1) // Explicitly cause the producer from guava cache to be evicted. CachedKafkaProducer.evict(producer.getKafkaParams) - assert(producer.getInUseCount.intValue() == 1) + assert(producer.getInUseCount == 1) assert(!producer.isClosed, "An in-use producer should not be closed.") } - private def generateKafkaParams: ju.HashMap[String, Object] = { + private def generateKafkaParams: ju.HashMap[String, Object] = { val kafkaParams = new ju.HashMap[String, Object]() + kafkaParams.put("ack", "0") kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("ack", "1") kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) kafkaParams } } + +class KafkaSinkStressSuite extends KafkaContinuousTest { + + import testImplicits._ + + override val streamingTimeout = 30.seconds + + override val brokerProps = Map("auto.create.topics.enable" -> "false") + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } + + override def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.kafka.producer.cache.timeout", "2ms") + } + + /* + * The following stress suite will cause frequent eviction of kafka producers from + * the guava cache. Since these producers remain in use, because they are used by + * multiple tasks, they stay in close queue till they are released finally. This test + * will cause new tasks to use fresh instance of kafka producers and as a result it + * simulates a stress situation, where multiple producers are requested from CachedKafkaProducer + * and at the same time there will be multiple releases. It is supposed to catch a race + * condition if any, due to multiple threads requesting and releasing producers. + */ + test("Single source and multiple kafka sink with 2ms cache timeout.") { + + val query = spark.readStream + .format("rate") + .option("numPartitions", "100") + .option("rowsPerSecond", "200") + .load() + .selectExpr("CAST(timestamp AS STRING) key", "CAST(value AS STRING) value") + val queries = for (i <- 1 to 10) yield { + val topic = newTopic() + testUtils.createTopic(topic, 100) + query.writeStream.format("kafka") + .option("broker.address", testUtils.brokerAddress) + .option("topic", topic).start() + } + queries.foreach{ q => + q.processAllAvailable() + q.awaitTermination(TimeUnit.MINUTES.toMillis(2)) + } + } +} From f6ad7ec93a94a77c476ba36bb00ce6616554303a Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 20 Mar 2019 12:28:02 +0530 Subject: [PATCH 07/18] Refactored code per the feedback. --- .../sql/kafka010/CachedKafkaProducer.scala | 49 +++++++++++-------- .../sql/kafka010/KafkaStreamingWrite.scala | 8 +-- .../spark/sql/kafka010/KafkaWriteTask.scala | 11 +++-- .../spark/sql/kafka010/KafkaWriter.scala | 2 +- .../kafka010/CachedKafkaProducerSuite.scala | 45 +++++++++++------ 5 files changed, 72 insertions(+), 43 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index d094825d4b69..5ed0adf445b6 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -127,42 +127,51 @@ private[kafka010] object CachedKafkaProducer extends Logging { * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = - this.synchronized { - val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap) - try { + private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = { + val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap) + try { + val producer = this.synchronized { val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) - val useCount = cachedKafkaProducer.inUseCount.incrementAndGet() - logDebug(s"Granted producer $cachedKafkaProducer, inuse-count: $useCount") + cachedKafkaProducer.inUseCount.incrementAndGet() + logDebug(s"Granted producer $cachedKafkaProducer") cachedKafkaProducer - } catch { - case e@(_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) - if e.getCause != null => - throw e.getCause } + producer + } catch { + case e@(_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) + if e.getCause != null => + throw e.getCause } + } private def paramsToSeq(kafkaParamsMap: ju.Map[String, Object]): Seq[(String, Object)] = { val paramsSeq: Seq[(String, Object)] = kafkaParamsMap.asScala.toSeq.sortBy(x => x._1) paramsSeq } - /* Release a kafka producer back to the kafka cache. We simple decrement it's inuse count. */ + /* Release a kafka producer back to the kafka cache. We simply decrement it's inuse count. */ private[kafka010] def release(producer: CachedKafkaProducer, failing: Boolean): Unit = { this.synchronized { - val inUseCount = producer.inUseCount.decrementAndGet() - logDebug(s"Released producer $producer, updated inuse count: $inUseCount") + // It should be ok to call release multiple times on the same producer object. + if (producer.inUse()) { + // So that we do not end up with -ve in-use counts. + producer.inUseCount.decrementAndGet() + logDebug(s"Released producer $producer.") + } if (failing) { // If this producer is failing to write, we remove it from cache. // So that it is re-created, eventually. - logDebug(s"Invalidated a failing producer: $producer.") - guavaCache.invalidate(producer.kafkaParams) - } - if (!producer.inUse() && !producer.isCached) { - // it will take care of removing it from close queue as well. - close(producer) + val cachedProducer = guavaCache.getIfPresent(producer.kafkaParams) + if (cachedProducer != null && cachedProducer.id == producer.id) { + logDebug(s"Invalidating a failing producer: $producer.") + guavaCache.invalidate(producer.kafkaParams) + } } } + if (!producer.inUse() && !producer.isCached) { + // it will take care of removing it from close queue as well. + close(producer) + } } /** Close this producer and process pending closes. */ @@ -179,7 +188,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { // Intended for testing purpose only. private[kafka010] def clear(): Unit = { - logInfo("Cleaning up guava cache and force closing all kafka producer.") + logInfo("Cleaning up guava cache and force closing all kafka producers.") guavaCache.invalidateAll() for (p <- closeQueue.iterator().asScala) { p.close() diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 09be6377d9fd..6d5ce1655e4b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -113,8 +113,10 @@ class KafkaStreamDataWriter( private def close(): Unit = { checkForErrors() - producer.flush() - checkForErrors() - CachedKafkaProducer.release(producer, failedWrite != null) + try { + producer.flush() + } finally { + CachedKafkaProducer.release(producer, failedWrite != null) + } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index b0f0432a161b..6667d118bbee 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -51,14 +51,17 @@ private[kafka010] class KafkaWriteTask( def close(): Unit = { checkForErrors() - producer.flush() - checkForErrors() - CachedKafkaProducer.release(producer, failedWrite != null) + try { + producer.flush() + } finally { + CachedKafkaProducer.release(producer, failedWrite != null) + } } + } private[kafka010] abstract class KafkaRowWriter( - inputSchema: Seq[Attribute], topic: Option[String]) extends Logging { + inputSchema: Seq[Attribute], topic: Option[String]) { protected val producer: CachedKafkaProducer // used to synchronize with Kafka callbacks diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index 1317509214e5..e1a9191cc5a8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -22,7 +22,7 @@ import java.{util => ju} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.types.{BinaryType, StringType} import org.apache.spark.util.Utils diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index b14f4b83cc64..53a64e348d3d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,16 +18,22 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.TimeUnit import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { + protected override def beforeEach(): Unit = { + super.beforeEach() + CachedKafkaProducer.clear() + } + test("Should return the cached instance on calling acquire with same params.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer = CachedKafkaProducer.acquire(kafkaParams) @@ -92,9 +98,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { } } -class KafkaSinkStressSuite extends KafkaContinuousTest { - - import testImplicits._ +class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest { override val streamingTimeout = 30.seconds @@ -124,22 +128,33 @@ class KafkaSinkStressSuite extends KafkaContinuousTest { */ test("Single source and multiple kafka sink with 2ms cache timeout.") { - val query = spark.readStream + val df = spark.readStream .format("rate") - .option("numPartitions", "100") - .option("rowsPerSecond", "200") + .option("numPartitions", "10") + .option("rowsPerSecond", "100") .load() .selectExpr("CAST(timestamp AS STRING) key", "CAST(value AS STRING) value") - val queries = for (i <- 1 to 10) yield { - val topic = newTopic() - testUtils.createTopic(topic, 100) - query.writeStream.format("kafka") - .option("broker.address", testUtils.brokerAddress) - .option("topic", topic).start() + + val checkpointDir = Utils.createTempDir() + val topic = newTopic() + testUtils.createTopic(topic, 100) + val queries = for (i <- 1 to 5) yield { + df.writeStream + .format("kafka") + .option("checkpointLocation", checkpointDir.getCanonicalPath + i) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", "1000") + .option("topic", topic) + .trigger(Trigger.Continuous(1000)) + .queryName(s"kafkaStream$i") + .start() } + Thread.sleep(30000) + queries.foreach{ q => - q.processAllAvailable() - q.awaitTermination(TimeUnit.MINUTES.toMillis(2)) + assert(q.exception.isEmpty, "None of the queries should fail.") + q.stop() } + } } From 71742171be6c854302dbdb2c7b4cdc05be46358d Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 20 Mar 2019 16:03:15 +0530 Subject: [PATCH 08/18] Delete KafkaWriteTest.scala Stray change. --- .../sql/streaming/KafkaWriteTest.scala | 53 ------------------- 1 file changed, 53 deletions(-) delete mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala deleted file mode 100644 index d3ede13dba8f..000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/KafkaWriteTest.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.sql.streaming - -import java.util.UUID - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.streaming.Trigger - - -object KafkaWriteTest { - def main(args: Array[String]): Unit = { - - val spark = SparkSession - .builder - .appName("KafkaWriteTest") - .getOrCreate() - import spark.implicits._ - - // Create DataSet representing the stream of input lines from kafka - val lines = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "localhost:9092") - .option("subscribe", "test") - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - - val q = lines.writeStream.format("kafka") - .option("kafka.bootstrap.servers", "localhost:9092") - .option("topic", "test1").trigger(Trigger.ProcessingTime("2 second")) - .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString) - .start() - q.awaitTermination() - } - -} From f287d791a8561ce8f73b403653ac8efbc4aba8e5 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 26 Mar 2019 16:35:41 +0530 Subject: [PATCH 09/18] Added a concurrent test and corrected a possibility of double release. --- .../sql/kafka010/CachedKafkaProducer.scala | 2 + .../sql/kafka010/KafkaStreamingWrite.scala | 3 +- .../spark/sql/kafka010/KafkaWriteTask.scala | 7 +- .../kafka010/CachedKafkaProducerSuite.scala | 64 +++++++++++++++++-- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 5ed0adf445b6..965d52900d1b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -157,6 +157,8 @@ private[kafka010] object CachedKafkaProducer extends Logging { // So that we do not end up with -ve in-use counts. producer.inUseCount.decrementAndGet() logDebug(s"Released producer $producer.") + } else { + logWarning(s"Tried to release a not in use producer, $producer.") } if (failing) { // If this producer is failing to write, we remove it from cache. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index 6d5ce1655e4b..a6e5d257d4e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -112,9 +112,10 @@ class KafkaStreamDataWriter( } private def close(): Unit = { - checkForErrors() try { + checkForErrors() producer.flush() + checkForErrors() } finally { CachedKafkaProducer.release(producer, failedWrite != null) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index 6667d118bbee..2c2665e0c161 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -21,7 +21,6 @@ import java.{util => ju} import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} import org.apache.spark.sql.types.{BinaryType, StringType} @@ -50,9 +49,10 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { - checkForErrors() try { + checkForErrors() producer.flush() + checkForErrors() } finally { CachedKafkaProducer.release(producer, failedWrite != null) } @@ -97,9 +97,6 @@ private[kafka010] abstract class KafkaRowWriter( protected def checkForErrors(): Unit = { if (failedWrite != null) { - // Before throwing exception, we should mark this acquired producer as not in use, - // for this particular task. Otherwise it will linger on, assuming that it is in use. - CachedKafkaProducer.release(producer, failing = true) throw failedWrite } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 53a64e348d3d..972203ea2f90 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,15 +18,19 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit} + +import scala.collection.mutable +import scala.util.Random import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils + class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { protected override def beforeEach(): Unit = { @@ -100,8 +104,6 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest { - override val streamingTimeout = 30.seconds - override val brokerProps = Map("auto.create.topics.enable" -> "false") override def afterAll(): Unit = { @@ -117,6 +119,60 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest conf.set("spark.kafka.producer.cache.timeout", "2ms") } + test("concurrent use of CachedKafkaProducer") { + val topic = "topic" + Random.nextInt() + val data = (1 to 10).map(_.toString) + testUtils.createTopic(topic, 1) + val kafkaParams: Map[String, Object] = Map("bootstrap.servers" -> testUtils.brokerAddress, + "key.serializer" -> classOf[ByteArraySerializer].getName, + "value.serializer" -> classOf[ByteArraySerializer].getName) + + import scala.collection.JavaConverters._ + + val numThreads: Int = 100 + val numConcurrentProducers: Int = 500 + + val kafkaParamsUniqueMap = mutable.HashMap.empty[Int, ju.Map[String, Object]] + ( 1 to numConcurrentProducers).map { + i => kafkaParamsUniqueMap.put(i, kafkaParams.updated("retries", s"$i").asJava) + } + val toBeReleasedQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() + + def acquire(i: Int): Unit = { + val producer = CachedKafkaProducer.acquire(kafkaParamsUniqueMap(i)) + producer.kafkaProducer // materialize producer. + assert(!producer.isClosed) + toBeReleasedQueue.add(producer) + } + + def release(producer: CachedKafkaProducer): Unit = { + if (producer != null) { + CachedKafkaProducer.release(producer, true) + } + } + val threadPool = Executors.newFixedThreadPool(numThreads) + try { + val futuresAcquire = (1 to numConcurrentProducers).map { i => + threadPool.submit(new Runnable { + override def run(): Unit = { + acquire(i) + } + }) + } + val futuresRelease = (1 to numConcurrentProducers).map { i => + threadPool.submit(new Runnable { + override def run(): Unit = { + release(toBeReleasedQueue.poll()) + } + }) + } + futuresAcquire.foreach(_.get(1, TimeUnit.MINUTES)) + futuresRelease.foreach(_.get(1, TimeUnit.MINUTES)) + } finally { + threadPool.shutdown() + } + + } /* * The following stress suite will cause frequent eviction of kafka producers from * the guava cache. Since these producers remain in use, because they are used by @@ -149,7 +205,7 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest .queryName(s"kafkaStream$i") .start() } - Thread.sleep(30000) + Thread.sleep(15000) queries.foreach{ q => assert(q.exception.isEmpty, "None of the queries should fail.") From f5406bb605433f7b3d278ff68a590de52ac5f462 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 26 Mar 2019 16:44:09 +0530 Subject: [PATCH 10/18] increasing the concurrency of the stress tests --- .../kafka010/CachedKafkaProducerSuite.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 972203ea2f90..0839f2b182ed 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -130,7 +130,7 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest import scala.collection.JavaConverters._ val numThreads: Int = 100 - val numConcurrentProducers: Int = 500 + val numConcurrentProducers: Int = 1000 val kafkaParamsUniqueMap = mutable.HashMap.empty[Int, ju.Map[String, Object]] ( 1 to numConcurrentProducers).map { @@ -140,8 +140,8 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest def acquire(i: Int): Unit = { val producer = CachedKafkaProducer.acquire(kafkaParamsUniqueMap(i)) - producer.kafkaProducer // materialize producer. - assert(!producer.isClosed) + producer.kafkaProducer // materialize producer for the first time. + assert(!producer.isClosed, "Acquired producer cannot be closed.") toBeReleasedQueue.add(producer) } @@ -152,17 +152,23 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest } val threadPool = Executors.newFixedThreadPool(numThreads) try { - val futuresAcquire = (1 to numConcurrentProducers).map { i => + val futuresAcquire = (1 to 10 * numConcurrentProducers).map { i => threadPool.submit(new Runnable { override def run(): Unit = { - acquire(i) + acquire(i % numConcurrentProducers + 1) } }) } - val futuresRelease = (1 to numConcurrentProducers).map { i => + val futuresRelease = (1 to 10 * numConcurrentProducers).map { i => threadPool.submit(new Runnable { override def run(): Unit = { - release(toBeReleasedQueue.poll()) + // 2x release should not corrupt the state of cache. + val cachedKafkaProducer = toBeReleasedQueue.poll() + release(cachedKafkaProducer) + release(cachedKafkaProducer) + if (cachedKafkaProducer.getInUseCount > 0) { + assert(!cachedKafkaProducer.isClosed, "Should not close an inuse producer.") + } } }) } @@ -171,8 +177,8 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest } finally { threadPool.shutdown() } - } + /* * The following stress suite will cause frequent eviction of kafka producers from * the guava cache. Since these producers remain in use, because they are used by @@ -186,22 +192,23 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest val df = spark.readStream .format("rate") - .option("numPartitions", "10") - .option("rowsPerSecond", "100") + .option("numPartitions", "100") + .option("rowsPerSecond", "200") .load() .selectExpr("CAST(timestamp AS STRING) key", "CAST(value AS STRING) value") val checkpointDir = Utils.createTempDir() val topic = newTopic() testUtils.createTopic(topic, 100) - val queries = for (i <- 1 to 5) yield { + val queries = for (i <- 1 to 20) yield { df.writeStream .format("kafka") .option("checkpointLocation", checkpointDir.getCanonicalPath + i) .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.max.block.ms", "1000") + // to make it create 5 unique producers. + .option("kafka.max.block.ms", s"100${i % 5}") .option("topic", topic) - .trigger(Trigger.Continuous(1000)) + .trigger(Trigger.Continuous(500)) .queryName(s"kafkaStream$i") .start() } From a1b431112b3e7fe1127baada32c028846cc7ca66 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 26 Mar 2019 17:39:16 +0530 Subject: [PATCH 11/18] 2x release, simulation. --- .../kafka010/CachedKafkaProducerSuite.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 0839f2b182ed..5954dc073cc6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -148,6 +148,9 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest def release(producer: CachedKafkaProducer): Unit = { if (producer != null) { CachedKafkaProducer.release(producer, true) + if (producer.getInUseCount > 0) { + assert(!producer.isClosed, "Should not close an inuse producer.") + } } } val threadPool = Executors.newFixedThreadPool(numThreads) @@ -160,20 +163,18 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest }) } val futuresRelease = (1 to 10 * numConcurrentProducers).map { i => - threadPool.submit(new Runnable { - override def run(): Unit = { - // 2x release should not corrupt the state of cache. - val cachedKafkaProducer = toBeReleasedQueue.poll() - release(cachedKafkaProducer) - release(cachedKafkaProducer) - if (cachedKafkaProducer.getInUseCount > 0) { - assert(!cachedKafkaProducer.isClosed, "Should not close an inuse producer.") + val cachedKafkaProducer = toBeReleasedQueue.poll() + // 2x release should not corrupt the state of cache. + (1 to 2).map { j => + threadPool.submit(new Runnable { + override def run(): Unit = { + release(cachedKafkaProducer) } - } - }) + }) + } } futuresAcquire.foreach(_.get(1, TimeUnit.MINUTES)) - futuresRelease.foreach(_.get(1, TimeUnit.MINUTES)) + futuresRelease.flatten.foreach(_.get(1, TimeUnit.MINUTES)) } finally { threadPool.shutdown() } From ac3af2f0ac0eba41e60d21e1ae24a600cd1b0d81 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 27 Mar 2019 12:16:21 +0530 Subject: [PATCH 12/18] inlined functions. --- .../spark/sql/kafka010/CachedKafkaProducer.scala | 14 +++++--------- .../sql/kafka010/CachedKafkaProducerSuite.scala | 1 - 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 965d52900d1b..bf702f401ad2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -68,19 +68,15 @@ private[kafka010] case class CachedKafkaProducer( } } - private def inUse(): Boolean = { - inUseCount.get() > 0 - } - private def unCache(): Unit = { - isCached = false - } + private def inUse(): Boolean = inUseCount.get() > 0 + + private def unCache(): Unit = isCached = false + private[kafka010] def getInUseCount: Int = inUseCount.get() private[kafka010] def getKafkaParams: Seq[(String, Object)] = kafkaParams - private[kafka010] def flush(): Unit = { - kafkaProducer.flush() - } + private[kafka010] def flush(): Unit = kafkaProducer.flush() private[kafka010] def isClosed: Boolean = closed } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 5954dc073cc6..20e8515f7d08 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -219,6 +219,5 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest assert(q.exception.isEmpty, "None of the queries should fail.") q.stop() } - } } From 83dd0a43193aaec39407a5cc5d1ae12d633e0bad Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 28 Mar 2019 15:04:28 +0530 Subject: [PATCH 13/18] code review --- .../kafka010/CachedKafkaProducerSuite.scala | 48 ++++++++++--------- .../kafka010/KafkaContinuousSinkSuite.scala | 18 ++++--- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 20e8515f7d08..f801ffc37c7d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -51,7 +51,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { test("Should return the new instance on calling acquire with different params.") { val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams val producer = CachedKafkaProducer.acquire(kafkaParams) - kafkaParams.remove("ack") // mutate the kafka params. + kafkaParams.remove("acks") // mutate the kafka params. val producer2 = CachedKafkaProducer.acquire(kafkaParams) assert(producer.kafkaProducer != producer2.kafkaProducer) assert(producer.getInUseCount == 1) @@ -94,7 +94,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { private def generateKafkaParams: ju.HashMap[String, Object] = { val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("ack", "0") + kafkaParams.put("acks", "0") kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) @@ -121,7 +121,6 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest test("concurrent use of CachedKafkaProducer") { val topic = "topic" + Random.nextInt() - val data = (1 to 10).map(_.toString) testUtils.createTopic(topic, 1) val kafkaParams: Map[String, Object] = Map("bootstrap.servers" -> testUtils.brokerAddress, "key.serializer" -> classOf[ByteArraySerializer].getName, @@ -129,11 +128,11 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest import scala.collection.JavaConverters._ - val numThreads: Int = 100 - val numConcurrentProducers: Int = 1000 + val numThreads = 100 + val numConcurrentProducers = 1000 val kafkaParamsUniqueMap = mutable.HashMap.empty[Int, ju.Map[String, Object]] - ( 1 to numConcurrentProducers).map { + (1 to numConcurrentProducers).map { i => kafkaParamsUniqueMap.put(i, kafkaParams.updated("retries", s"$i").asJava) } val toBeReleasedQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() @@ -147,7 +146,7 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest def release(producer: CachedKafkaProducer): Unit = { if (producer != null) { - CachedKafkaProducer.release(producer, true) + CachedKafkaProducer.release(producer, Random.nextBoolean()) if (producer.getInUseCount > 0) { assert(!producer.isClosed, "Should not close an inuse producer.") } @@ -177,6 +176,7 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest futuresRelease.flatten.foreach(_.get(1, TimeUnit.MINUTES)) } finally { threadPool.shutdown() + CachedKafkaProducer.clear() } } @@ -201,23 +201,25 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest val checkpointDir = Utils.createTempDir() val topic = newTopic() testUtils.createTopic(topic, 100) - val queries = for (i <- 1 to 20) yield { - df.writeStream - .format("kafka") - .option("checkpointLocation", checkpointDir.getCanonicalPath + i) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - // to make it create 5 unique producers. - .option("kafka.max.block.ms", s"100${i % 5}") - .option("topic", topic) - .trigger(Trigger.Continuous(500)) - .queryName(s"kafkaStream$i") - .start() - } - Thread.sleep(15000) + failAfter(streamingTimeout) { + val queries = for (i <- 1 to 10) yield { + df.writeStream + .format("kafka") + .option("checkpointLocation", checkpointDir.getCanonicalPath + i) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // to make it create 5 unique producers. + .option("kafka.max.block.ms", s"100${i % 5}") + .option("topic", topic) + .trigger(Trigger.Continuous(500)) + .queryName(s"kafkaStream$i") + .start() + } + Thread.sleep(15000) - queries.foreach{ q => - assert(q.exception.isEmpty, "None of the queries should fail.") - q.stop() + queries.foreach { q => + assert(q.exception.isEmpty, "None of the queries should fail.") + q.stop() + } } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index c871d1c61d1c..ff06152f04fb 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -418,13 +418,17 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val inputSchema = Seq(AttributeReference("value", BinaryType)()) val data = new Array[Byte](15000) // large value val writeTask = new KafkaStreamDataWriter(Some(topic), options, inputSchema) - val fieldTypes: Array[DataType] = Array(BinaryType) - val converter = UnsafeProjection.create(fieldTypes) - val row = new SpecificInternalRow(fieldTypes) - row.update(0, data) - val iter = Seq.fill(1000)(converter.apply(row)).iterator - iter.foreach(writeTask.write(_)) - writeTask.commit() + try { + val fieldTypes: Array[DataType] = Array(BinaryType) + val converter = UnsafeProjection.create(fieldTypes) + val row = new SpecificInternalRow(fieldTypes) + row.update(0, data) + val iter = Seq.fill(1000)(converter.apply(row)).iterator + iter.foreach(writeTask.write(_)) + writeTask.commit() + } finally { + writeTask.abort() + } } private def createKafkaReader(topic: String): DataFrame = { From 114e8b4739003cd95d7c38a85028d00debca56e3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 29 Mar 2019 12:44:45 +0530 Subject: [PATCH 14/18] more review feedback. --- .../kafka010/CachedKafkaProducerSuite.scala | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index f801ffc37c7d..530a6d4230eb 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -26,7 +26,7 @@ import scala.util.Random import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -73,8 +73,14 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { .option("kafka.bootstrap.servers", "12.0.0.1:39022") .save() } - assert(ex.getMessage.contains("org.apache.kafka.common.errors.TimeoutException"), + assert(ex.getMessage.contains("TimeoutException"), "Spark command should fail due to service not reachable.") + // Kafka first tries to fetch metadata and reports failures as, " not present in metadata after + // max.block.ms time." + assert(ex.getMessage.toLowerCase(ju.Locale.ROOT) + .contains("not present in metadata after 2 ms."), + "Spark command should fail due to service not reachable.") + // Since failing kafka producer is released on error and also invalidated, it should not be in // cache. val map = CachedKafkaProducer.getAsMap @@ -129,7 +135,7 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest import scala.collection.JavaConverters._ val numThreads = 100 - val numConcurrentProducers = 1000 + val numConcurrentProducers = 500 val kafkaParamsUniqueMap = mutable.HashMap.empty[Int, ju.Map[String, Object]] (1 to numConcurrentProducers).map { @@ -201,23 +207,29 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest val checkpointDir = Utils.createTempDir() val topic = newTopic() testUtils.createTopic(topic, 100) - failAfter(streamingTimeout) { - val queries = for (i <- 1 to 10) yield { - df.writeStream - .format("kafka") - .option("checkpointLocation", checkpointDir.getCanonicalPath + i) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - // to make it create 5 unique producers. - .option("kafka.max.block.ms", s"100${i % 5}") - .option("topic", topic) - .trigger(Trigger.Continuous(500)) - .queryName(s"kafkaStream$i") - .start() - } - Thread.sleep(15000) + var queries: Seq[StreamingQuery] = Seq.empty[StreamingQuery] + try { + failAfter(streamingTimeout) { + queries = for (i <- 1 to 10) yield { + df.writeStream + .format("kafka") + .option("checkpointLocation", checkpointDir.getCanonicalPath + i) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // to make it create 5 unique producers. + .option("kafka.max.block.ms", s"100${i % 5}") + .option("topic", topic) + .trigger(Trigger.Continuous(500)) + .queryName(s"kafkaStream$i") + .start() + } + Thread.sleep(15000) + queries.foreach { q => + assert(q.exception.isEmpty, "None of the queries should fail.") + } + } + } finally { queries.foreach { q => - assert(q.exception.isEmpty, "None of the queries should fail.") q.stop() } } From 1bcf82a8946c5e5463c9d58006dd828d100d41e7 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 5 Apr 2019 16:57:30 +0530 Subject: [PATCH 15/18] review comments and discussion. --- .../sql/kafka010/CachedKafkaProducer.scala | 6 +++- .../kafka010/CachedKafkaProducerSuite.scala | 29 ++++++++----------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index bf702f401ad2..9c02cb8c37c2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -52,6 +53,7 @@ private[kafka010] case class CachedKafkaProducer( } @volatile private var isCached: Boolean = true + @GuardedBy("this") private var closed: Boolean = true private def close(): Unit = { try { @@ -184,7 +186,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } - // Intended for testing purpose only. + // For testing only. private[kafka010] def clear(): Unit = { logInfo("Cleaning up guava cache and force closing all kafka producers.") guavaCache.invalidateAll() @@ -194,10 +196,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { closeQueue.clear() } + // For testing only. private[kafka010] def evict(params: Seq[(String, Object)]): Unit = { guavaCache.invalidate(params) } + // For testing only. private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] = guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 530a6d4230eb..d371eca84098 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutable import scala.util.Random +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.spark.{SparkConf, SparkException} @@ -141,13 +142,12 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest (1 to numConcurrentProducers).map { i => kafkaParamsUniqueMap.put(i, kafkaParams.updated("retries", s"$i").asJava) } - val toBeReleasedQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() - def acquire(i: Int): Unit = { + def acquire(i: Int): CachedKafkaProducer = { val producer = CachedKafkaProducer.acquire(kafkaParamsUniqueMap(i)) producer.kafkaProducer // materialize producer for the first time. assert(!producer.isClosed, "Acquired producer cannot be closed.") - toBeReleasedQueue.add(producer) + producer } def release(producer: CachedKafkaProducer): Unit = { @@ -158,28 +158,23 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest } } } + val data = (1 to 100).map(_.toString) + val threadPool = Executors.newFixedThreadPool(numThreads) try { val futuresAcquire = (1 to 10 * numConcurrentProducers).map { i => threadPool.submit(new Runnable { override def run(): Unit = { - acquire(i % numConcurrentProducers + 1) + val producer = acquire(i % numConcurrentProducers + 1) + data.foreach { d => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes) + producer.kafkaProducer.send(record) + } + release(producer) } }) } - val futuresRelease = (1 to 10 * numConcurrentProducers).map { i => - val cachedKafkaProducer = toBeReleasedQueue.poll() - // 2x release should not corrupt the state of cache. - (1 to 2).map { j => - threadPool.submit(new Runnable { - override def run(): Unit = { - release(cachedKafkaProducer) - } - }) - } - } futuresAcquire.foreach(_.get(1, TimeUnit.MINUTES)) - futuresRelease.flatten.foreach(_.get(1, TimeUnit.MINUTES)) } finally { threadPool.shutdown() CachedKafkaProducer.clear() From da715896ebf56f078cb7faf88687ad09520cfdd3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 10 Apr 2019 12:27:03 +0530 Subject: [PATCH 16/18] One flag less, but we scan closequeue more often than before. --- .../sql/kafka010/CachedKafkaProducer.scala | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 9c02cb8c37c2..4e3729bc09d5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -51,8 +51,6 @@ private[kafka010] case class CachedKafkaProducer( closed = false producer } - @volatile - private var isCached: Boolean = true @GuardedBy("this") private var closed: Boolean = true private def close(): Unit = { @@ -72,8 +70,6 @@ private[kafka010] case class CachedKafkaProducer( private def inUse(): Boolean = inUseCount.get() > 0 - private def unCache(): Unit = isCached = false - private[kafka010] def getInUseCount: Int = inUseCount.get() private[kafka010] def getKafkaParams: Seq[(String, Object)] = kafkaParams @@ -104,13 +100,13 @@ private[kafka010] object CachedKafkaProducer extends Logging { override def onRemoval( notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = { val producer: CachedKafkaProducer = notification.getValue + logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}.") if (producer.inUse()) { - logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}.") - // When `inuse` producer is evicted we wait for it to be released before finally closing it. + // When `inuse` producer is evicted we wait for it to be released by all the tasks, + // before finally closing it. closeQueue.add(producer) - producer.unCache() } else { - close(producer) + producer.close() } } } @@ -168,15 +164,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } } - if (!producer.inUse() && !producer.isCached) { - // it will take care of removing it from close queue as well. - close(producer) - } + // We need a close queue, so that we can close the producer(s) outside of a synchronized block. + processPendingClose(producer) } - /** Close this producer and process pending closes. */ - private def close(producer: CachedKafkaProducer): Unit = { - producer.close() + /** Process pending closes. */ + private def processPendingClose(producer: CachedKafkaProducer): Unit = { // Check and close any other producers previously evicted, but pending to be closed. for (p <- closeQueue.iterator().asScala) { if (!p.inUse()) { From a3443eaef3db7cc7c06daed95a08641fb4ab7d94 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 12 Apr 2019 11:43:45 +0530 Subject: [PATCH 17/18] Addressed few code refactoring nits. --- .../sql/kafka010/CachedKafkaProducer.scala | 21 +++++++++---------- .../kafka010/CachedKafkaProducerSuite.scala | 14 +++---------- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 4e3729bc09d5..6f40e69f71bb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -39,13 +39,8 @@ private[kafka010] case class CachedKafkaProducer( private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava - private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = - KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) - .setAuthenticationConfigIfNeeded() - .build() - lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = { - val producer = new KafkaProducer[Array[Byte], Array[Byte]](updatedAuthConfigIfNeeded(configMap)) + val producer = new KafkaProducer[Array[Byte], Array[Byte]](configMap) logDebug(s"Created a new instance of KafkaProducer for " + s"$kafkaParams with Id: $id") closed = false @@ -59,7 +54,7 @@ private[kafka010] case class CachedKafkaProducer( if (!closed) { closed = true kafkaProducer.close() - logInfo(s"Closed kafka producer: $this") + logDebug(s"Closed kafka producer: $this") } } } catch { @@ -94,6 +89,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } + private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = + KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + private val closeQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() private val removalListener = new RemovalListener[Seq[(String, Object)], CachedKafkaProducer]() { @@ -122,7 +122,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { * one instance per specified kafkaParams. */ private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = { - val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap) + val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap)) try { val producer = this.synchronized { val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) @@ -148,7 +148,6 @@ private[kafka010] object CachedKafkaProducer extends Logging { this.synchronized { // It should be ok to call release multiple times on the same producer object. if (producer.inUse()) { - // So that we do not end up with -ve in-use counts. producer.inUseCount.decrementAndGet() logDebug(s"Released producer $producer.") } else { @@ -165,11 +164,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } // We need a close queue, so that we can close the producer(s) outside of a synchronized block. - processPendingClose(producer) + processPendingClose() } /** Process pending closes. */ - private def processPendingClose(producer: CachedKafkaProducer): Unit = { + private def processPendingClose(): Unit = { // Check and close any other producers previously evicted, but pending to be closed. for (p <- closeQueue.iterator().asScala) { if (!p.inUse()) { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index d371eca84098..233346fc9cb7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -40,7 +40,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { } test("Should return the cached instance on calling acquire with same params.") { - val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams + val kafkaParams = generateKafkaParams val producer = CachedKafkaProducer.acquire(kafkaParams) val producer2 = CachedKafkaProducer.acquire(kafkaParams) assert(producer.kafkaProducer == producer2.kafkaProducer) @@ -50,7 +50,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { } test("Should return the new instance on calling acquire with different params.") { - val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams + val kafkaParams = generateKafkaParams val producer = CachedKafkaProducer.acquire(kafkaParams) kafkaParams.remove("acks") // mutate the kafka params. val producer2 = CachedKafkaProducer.acquire(kafkaParams) @@ -89,7 +89,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { } test("Should not close a producer in-use.") { - val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams + val kafkaParams = generateKafkaParams val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams) producer.kafkaProducer // initializing the producer. assert(producer.getInUseCount == 1) @@ -113,14 +113,6 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest override val brokerProps = Map("auto.create.topics.enable" -> "false") - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - } - super.afterAll() - } - override def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.kafka.producer.cache.timeout", "2ms") From 8a0906e11eab36cb92c159915915b77334d34312 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 11 Jul 2019 15:42:22 +0530 Subject: [PATCH 18/18] Fixed the failing test. --- .../spark/sql/kafka010/CachedKafkaProducerSuite.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 233346fc9cb7..0c888e81b92c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -26,7 +26,7 @@ import scala.util.Random import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -74,14 +74,9 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { .option("kafka.bootstrap.servers", "12.0.0.1:39022") .save() } - assert(ex.getMessage.contains("TimeoutException"), - "Spark command should fail due to service not reachable.") // Kafka first tries to fetch metadata and reports failures as, " not present in metadata after // max.block.ms time." - assert(ex.getMessage.toLowerCase(ju.Locale.ROOT) - .contains("not present in metadata after 2 ms."), - "Spark command should fail due to service not reachable.") - + TestUtils.assertExceptionMsg(ex, "org.apache.kafka.common.errors.TimeoutException") // Since failing kafka producer is released on error and also invalidated, it should not be in // cache. val map = CachedKafkaProducer.getAsMap