From 25446bac730b51ebbb4ad7add47db52b1a3bc572 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 11 Nov 2019 14:14:06 +0100 Subject: [PATCH 1/2] [SPARK-27042][SS] Invalidate cached Kafka producer in case of task retry --- .../sql/kafka010/CachedKafkaProducer.scala | 6 +++++- .../sql/kafka010/KafkaDataConsumer.scala | 2 ++ .../kafka010/CachedKafkaProducerSuite.scala | 21 +++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 907440ab3731..d7719d840c1e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord} -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil} import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._ @@ -93,6 +93,10 @@ private[kafka010] object CachedKafkaProducer extends Logging { .setAuthenticationConfigIfNeeded() .build() val key = toCacheKey(updatedKafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { + logDebug(s"Task re-attempt detected, invalidating producers with key $key") + producerPool.invalidateKey(key) + } producerPool.borrowObject(key, updatedKafkaParams) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index f2dad945719e..8ee1b580f8b8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -628,6 +628,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { val cacheKey = new CacheKey(topicPartition, kafkaParams) + logDebug(s"Task re-attempt detected, invalidating consumers with key $cacheKey") + // If this is reattempt at running the task, then invalidate cached consumer if any. consumerPool.invalidateKey(cacheKey) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala index 4506a4029d88..b20635a2cb4c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala @@ -97,6 +97,27 @@ class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTest assert(producerPool.size(toCacheKey(kafkaParams2)) === 1) } + test("acquire should return a new instance with Task retry") { + try { + val kafkaParams = getKafkaParams() + + val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) + TaskContext.setTaskContext(context1) + val producer1 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer1) + + val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null) + TaskContext.setTaskContext(context2) + val producer2 = CachedKafkaProducer.acquire(kafkaParams) + CachedKafkaProducer.release(producer2) + + assert(producer1 !== producer2) + assert(producerPool.size(toCacheKey(kafkaParams)) === 1) + } finally { + TaskContext.unset() + } + } + test("Concurrent use of CachedKafkaProducer") { val data = (1 to 1000).map(_.toString) testUtils.createTopic(topic, 1) From d3843244c467487f255a4895feb9368c2199a8b3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 13 Nov 2019 13:32:45 +0100 Subject: [PATCH 2/2] Comment fix --- .../org/apache/spark/sql/kafka010/CachedKafkaProducer.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaDataConsumer.scala | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index d7719d840c1e..5895f9fb5548 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -94,7 +94,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { .build() val key = toCacheKey(updatedKafkaParams) if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - logDebug(s"Task re-attempt detected, invalidating producers with key $key") + logDebug(s"Invalidating key $key") producerPool.invalidateKey(key) } producerPool.borrowObject(key, updatedKafkaParams) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 8ee1b580f8b8..d07323d34110 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -627,8 +627,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { kafkaParams: ju.Map[String, Object]): KafkaDataConsumer = { if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { val cacheKey = new CacheKey(topicPartition, kafkaParams) - - logDebug(s"Task re-attempt detected, invalidating consumers with key $cacheKey") + logDebug(s"Invalidating key $cacheKey") // If this is reattempt at running the task, then invalidate cached consumer if any. consumerPool.invalidateKey(cacheKey)