diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index fc177cdc9037e..ddcae16208d74 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -18,21 +18,69 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} +import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.kafka.clients.producer.KafkaProducer -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} -private[kafka010] object CachedKafkaProducer extends Logging { +private[kafka010] case class CachedKafkaProducer( + private val id: String = ju.UUID.randomUUID().toString, + private val inUseCount: AtomicInteger = new AtomicInteger(0), + private val kafkaParams: Seq[(String, Object)]) extends Logging { + + private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava + + lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + val producer = new KafkaProducer[Array[Byte], Array[Byte]](configMap) + if (log.isDebugEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(kafkaParams) + logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq, with Id: $id.") + } + closed = false + producer + } + @GuardedBy("this") + private var closed: Boolean = true + private def close(): Unit = { + try { + this.synchronized { + if (!closed) { + closed = true + if (log.isInfoEnabled()) { + val redactedParamsSeq = KafkaRedactionUtil.redactParams(kafkaParams) + logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") + } + kafkaProducer.close() + } + } + } catch { + case NonFatal(e) => + logWarning(s"Error while closing kafka producer with params: $kafkaParams", e) + } + } + + private def inUse(): Boolean = inUseCount.get() > 0 + + private[kafka010] def getInUseCount: Int = inUseCount.get() - private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + private[kafka010] def getKafkaParams: Seq[(String, Object)] = kafkaParams + + private[kafka010] def flush(): Unit = kafkaProducer.flush() + + private[kafka010] def isClosed: Boolean = closed +} + +private[kafka010] object CachedKafkaProducer extends Logging { private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) @@ -40,89 +88,121 @@ private[kafka010] object CachedKafkaProducer extends Logging { .map(_.conf.get(PRODUCER_CACHE_TIMEOUT)) .getOrElse(defaultCacheExpireTimeout) - private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { - override def load(config: Seq[(String, Object)]): Producer = { - createKafkaProducer(config) + private val cacheLoader = new CacheLoader[Seq[(String, Object)], CachedKafkaProducer] { + override def load(params: Seq[(String, Object)]): CachedKafkaProducer = { + CachedKafkaProducer(kafkaParams = params) } } - private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { + private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) = + KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + + private val closeQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]() + + private val removalListener = new RemovalListener[Seq[(String, Object)], CachedKafkaProducer]() { override def onRemoval( - notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { - val paramsSeq: Seq[(String, Object)] = notification.getKey - val producer: Producer = notification.getValue + notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = { + val producer: CachedKafkaProducer = notification.getValue if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) + val redactedParamsSeq = KafkaRedactionUtil.redactParams(producer.kafkaParams) logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " + s"due to ${notification.getCause}") } - close(paramsSeq, producer) + if (producer.inUse()) { + // When `inuse` producer is evicted we wait for it to be released by all the tasks, + // before finally closing it. + closeQueue.add(producer) + } else { + producer.close() + } } } - private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] = + private lazy val guavaCache: LoadingCache[Seq[(String, Object)], CachedKafkaProducer] = CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) .removalListener(removalListener) - .build[Seq[(String, Object)], Producer](cacheLoader) - - private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = { - val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava) - if (log.isDebugEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.") - } - kafkaProducer - } + .build[Seq[(String, Object)], CachedKafkaProducer](cacheLoader) /** * Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't * exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep * one instance per specified kafkaParams. */ - private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { - val updatedKafkaProducerConfiguration = - KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) - .setAuthenticationConfigIfNeeded() - .build() - val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration) + private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = { + val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap)) try { - guavaCache.get(paramsSeq) + val producer = this.synchronized { + val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq) + cachedKafkaProducer.inUseCount.incrementAndGet() + logDebug(s"Granted producer $cachedKafkaProducer") + cachedKafkaProducer + } + producer } catch { - case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) + case e@(_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError) if e.getCause != null => throw e.getCause } } - private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { - val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1) + private def paramsToSeq(kafkaParamsMap: ju.Map[String, Object]): Seq[(String, Object)] = { + val paramsSeq: Seq[(String, Object)] = kafkaParamsMap.asScala.toSeq.sortBy(x => x._1) paramsSeq } - /** For explicitly closing kafka producer */ - private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = { - val paramsSeq = paramsToSeq(kafkaParams) - guavaCache.invalidate(paramsSeq) + /* Release a kafka producer back to the kafka cache. We simply decrement it's inuse count. */ + private[kafka010] def release(producer: CachedKafkaProducer, failing: Boolean): Unit = { + this.synchronized { + // It should be ok to call release multiple times on the same producer object. + if (producer.inUse()) { + producer.inUseCount.decrementAndGet() + logDebug(s"Released producer $producer.") + } else { + logWarning(s"Tried to release a not in use producer, $producer.") + } + if (failing) { + // If this producer is failing to write, we remove it from cache. + // So that it is re-created, eventually. + val cachedProducer = guavaCache.getIfPresent(producer.kafkaParams) + if (cachedProducer != null && cachedProducer.id == producer.id) { + logDebug(s"Invalidating a failing producer: $producer.") + guavaCache.invalidate(producer.kafkaParams) + } + } + } + // We need a close queue, so that we can close the producer(s) outside of a synchronized block. + processPendingClose() } - /** Auto close on cache evict */ - private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = { - try { - if (log.isInfoEnabled()) { - val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq) - logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.") + /** Process pending closes. */ + private def processPendingClose(): Unit = { + // Check and close any other producers previously evicted, but pending to be closed. + for (p <- closeQueue.iterator().asScala) { + if (!p.inUse()) { + closeQueue.remove(p) + p.close() } - producer.close() - } catch { - case NonFatal(e) => logWarning("Error while closing kafka producer.", e) } } + // For testing only. private[kafka010] def clear(): Unit = { - logInfo("Cleaning up guava cache.") + logInfo("Cleaning up guava cache and force closing all kafka producers.") guavaCache.invalidateAll() + for (p <- closeQueue.iterator().asScala) { + p.close() + } + closeQueue.clear() + } + + // For testing only. + private[kafka010] def evict(params: Seq[(String, Object)]): Unit = { + guavaCache.invalidate(params) } - // Intended for testing purpose only. - private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap() + // For testing only. + private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] = + guavaCache.asMap() } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala index 884773452b2a5..d6f575eb0562b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala @@ -44,7 +44,7 @@ private[kafka010] class KafkaDataWriter( inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams) + protected lazy val producer = CachedKafkaProducer.acquire(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -61,14 +61,17 @@ private[kafka010] class KafkaDataWriter( KafkaDataWriterCommitMessage } - def abort(): Unit = {} + def abort(): Unit = { + close() + } def close(): Unit = { - checkForErrors() - if (producer != null) { + try { + checkForErrors() producer.flush() checkForErrors() - CachedKafkaProducer.close(producerParams) + } finally { + CachedKafkaProducer.release(producer, failedWrite != null) } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index 041fac7717635..2c2665e0c1612 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} @@ -35,13 +35,13 @@ private[kafka010] class KafkaWriteTask( inputSchema: Seq[Attribute], topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { // used to synchronize with Kafka callbacks - private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ + protected val producer: CachedKafkaProducer = + CachedKafkaProducer.acquire(producerConfiguration) /** * Writes key value data out to topics. */ def execute(iterator: Iterator[InternalRow]): Unit = { - producer = CachedKafkaProducer.getOrCreate(producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() sendRow(currentRow, producer) @@ -49,18 +49,21 @@ private[kafka010] class KafkaWriteTask( } def close(): Unit = { - checkForErrors() - if (producer != null) { + try { + checkForErrors() producer.flush() checkForErrors() - producer = null + } finally { + CachedKafkaProducer.release(producer, failedWrite != null) } } + } private[kafka010] abstract class KafkaRowWriter( inputSchema: Seq[Attribute], topic: Option[String]) { + protected val producer: CachedKafkaProducer // used to synchronize with Kafka callbacks @volatile protected var failedWrite: Exception = _ protected val projection = createProjection @@ -79,7 +82,7 @@ private[kafka010] abstract class KafkaRowWriter( * assuming the row is in Kafka. */ protected def sendRow( - row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = { + row: InternalRow, producer: CachedKafkaProducer): Unit = { val projectedRow = projection(row) val topic = projectedRow.getUTF8String(0) val key = projectedRow.getBinary(1) @@ -89,7 +92,7 @@ private[kafka010] abstract class KafkaRowWriter( s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) - producer.send(record, callback) + producer.kafkaProducer.send(record, callback) } protected def checkForErrors(): Unit = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 0b3355426df10..0c888e81b92cc 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -18,60 +18,202 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.concurrent.ConcurrentMap +import java.util.concurrent.{Executors, TimeUnit} -import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.mutable +import scala.util.Random + +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.PrivateMethodTester +import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils -class CachedKafkaProducerSuite extends SharedSQLContext with PrivateMethodTester with KafkaTest { - type KP = KafkaProducer[Array[Byte], Array[Byte]] +class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest { protected override def beforeEach(): Unit = { super.beforeEach() CachedKafkaProducer.clear() } - test("Should return the cached instance on calling getOrCreate with same params.") { - val kafkaParams = new ju.HashMap[String, Object]() - kafkaParams.put("acks", "0") - // Here only host should be resolvable, it does not need a running instance of kafka server. - kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") - kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) - kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer = CachedKafkaProducer.getOrCreate(kafkaParams) - val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams) - assert(producer == producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) + test("Should return the cached instance on calling acquire with same params.") { + val kafkaParams = generateKafkaParams + val producer = CachedKafkaProducer.acquire(kafkaParams) + val producer2 = CachedKafkaProducer.acquire(kafkaParams) + assert(producer.kafkaProducer == producer2.kafkaProducer) + assert(producer.getInUseCount == 2) + val map = CachedKafkaProducer.getAsMap assert(map.size == 1) } - test("Should close the correct kafka producer for the given kafkaPrams.") { + test("Should return the new instance on calling acquire with different params.") { + val kafkaParams = generateKafkaParams + val producer = CachedKafkaProducer.acquire(kafkaParams) + kafkaParams.remove("acks") // mutate the kafka params. + val producer2 = CachedKafkaProducer.acquire(kafkaParams) + assert(producer.kafkaProducer != producer2.kafkaProducer) + assert(producer.getInUseCount == 1) + assert(producer2.getInUseCount == 1) + val map = CachedKafkaProducer.getAsMap + assert(map.size == 2) + } + + test("Automatically remove a failing kafka producer from cache.") { + import testImplicits._ + val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") + val ex = intercept[SparkException] { + // This will fail because the service is not reachable. + df.write + .format("kafka") + .option("topic", "topic") + .option("kafka.retries", "1") + .option("kafka.max.block.ms", "2") + .option("kafka.bootstrap.servers", "12.0.0.1:39022") + .save() + } + // Kafka first tries to fetch metadata and reports failures as, " not present in metadata after + // max.block.ms time." + TestUtils.assertExceptionMsg(ex, "org.apache.kafka.common.errors.TimeoutException") + // Since failing kafka producer is released on error and also invalidated, it should not be in + // cache. + val map = CachedKafkaProducer.getAsMap + assert(map.size == 0) + } + + test("Should not close a producer in-use.") { + val kafkaParams = generateKafkaParams + val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams) + producer.kafkaProducer // initializing the producer. + assert(producer.getInUseCount == 1) + // Explicitly cause the producer from guava cache to be evicted. + CachedKafkaProducer.evict(producer.getKafkaParams) + assert(producer.getInUseCount == 1) + assert(!producer.isClosed, "An in-use producer should not be closed.") + } + + private def generateKafkaParams: ju.HashMap[String, Object] = { val kafkaParams = new ju.HashMap[String, Object]() kafkaParams.put("acks", "0") kafkaParams.put("bootstrap.servers", "127.0.0.1:9022") kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName) kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName) - val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - kafkaParams.put("acks", "1") - val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams) - // With updated conf, a new producer instance should be created. - assert(producer != producer2) - - val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], KP]]('getAsMap) - val map = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map.size == 2) + kafkaParams + } +} + +class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest { + + override val brokerProps = Map("auto.create.topics.enable" -> "false") + + override def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.kafka.producer.cache.timeout", "2ms") + } + + test("concurrent use of CachedKafkaProducer") { + val topic = "topic" + Random.nextInt() + testUtils.createTopic(topic, 1) + val kafkaParams: Map[String, Object] = Map("bootstrap.servers" -> testUtils.brokerAddress, + "key.serializer" -> classOf[ByteArraySerializer].getName, + "value.serializer" -> classOf[ByteArraySerializer].getName) - CachedKafkaProducer.close(kafkaParams) - val map2 = CachedKafkaProducer.invokePrivate(cacheMap()) - assert(map2.size == 1) import scala.collection.JavaConverters._ - val (seq: Seq[(String, Object)], _producer: KP) = map2.asScala.toArray.apply(0) - assert(_producer == producer) + + val numThreads = 100 + val numConcurrentProducers = 500 + + val kafkaParamsUniqueMap = mutable.HashMap.empty[Int, ju.Map[String, Object]] + (1 to numConcurrentProducers).map { + i => kafkaParamsUniqueMap.put(i, kafkaParams.updated("retries", s"$i").asJava) + } + + def acquire(i: Int): CachedKafkaProducer = { + val producer = CachedKafkaProducer.acquire(kafkaParamsUniqueMap(i)) + producer.kafkaProducer // materialize producer for the first time. + assert(!producer.isClosed, "Acquired producer cannot be closed.") + producer + } + + def release(producer: CachedKafkaProducer): Unit = { + if (producer != null) { + CachedKafkaProducer.release(producer, Random.nextBoolean()) + if (producer.getInUseCount > 0) { + assert(!producer.isClosed, "Should not close an inuse producer.") + } + } + } + val data = (1 to 100).map(_.toString) + + val threadPool = Executors.newFixedThreadPool(numThreads) + try { + val futuresAcquire = (1 to 10 * numConcurrentProducers).map { i => + threadPool.submit(new Runnable { + override def run(): Unit = { + val producer = acquire(i % numConcurrentProducers + 1) + data.foreach { d => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes) + producer.kafkaProducer.send(record) + } + release(producer) + } + }) + } + futuresAcquire.foreach(_.get(1, TimeUnit.MINUTES)) + } finally { + threadPool.shutdown() + CachedKafkaProducer.clear() + } + } + + /* + * The following stress suite will cause frequent eviction of kafka producers from + * the guava cache. Since these producers remain in use, because they are used by + * multiple tasks, they stay in close queue till they are released finally. This test + * will cause new tasks to use fresh instance of kafka producers and as a result it + * simulates a stress situation, where multiple producers are requested from CachedKafkaProducer + * and at the same time there will be multiple releases. It is supposed to catch a race + * condition if any, due to multiple threads requesting and releasing producers. + */ + test("Single source and multiple kafka sink with 2ms cache timeout.") { + + val df = spark.readStream + .format("rate") + .option("numPartitions", "100") + .option("rowsPerSecond", "200") + .load() + .selectExpr("CAST(timestamp AS STRING) key", "CAST(value AS STRING) value") + + val checkpointDir = Utils.createTempDir() + val topic = newTopic() + testUtils.createTopic(topic, 100) + var queries: Seq[StreamingQuery] = Seq.empty[StreamingQuery] + try { + failAfter(streamingTimeout) { + queries = for (i <- 1 to 10) yield { + df.writeStream + .format("kafka") + .option("checkpointLocation", checkpointDir.getCanonicalPath + i) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // to make it create 5 unique producers. + .option("kafka.max.block.ms", s"100${i % 5}") + .option("topic", topic) + .trigger(Trigger.Continuous(500)) + .queryName(s"kafkaStream$i") + .start() + } + Thread.sleep(15000) + + queries.foreach { q => + assert(q.exception.isEmpty, "None of the queries should fail.") + } + } + } finally { + queries.foreach { q => + q.stop() + } + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index 65adbd6b9887c..2c1e2c9510a4a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -374,7 +374,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { iter.foreach(writeTask.write(_)) writeTask.commit() } finally { - writeTask.close() + writeTask.abort() } }