Skip to content

Commit 25446ba

Browse files
committed
[SPARK-27042][SS] Invalidate cached Kafka producer in case of task retry
1 parent 4de7131 commit 25446ba

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
2525

2626
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord}
2727

28-
import org.apache.spark.SparkEnv
28+
import org.apache.spark.{SparkEnv, TaskContext}
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
3131
import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
@@ -93,6 +93,10 @@ private[kafka010] object CachedKafkaProducer extends Logging {
9393
.setAuthenticationConfigIfNeeded()
9494
.build()
9595
val key = toCacheKey(updatedKafkaParams)
96+
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
97+
logDebug(s"Task re-attempt detected, invalidating producers with key $key")
98+
producerPool.invalidateKey(key)
99+
}
96100
producerPool.borrowObject(key, updatedKafkaParams)
97101
}
98102

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,8 @@ private[kafka010] object KafkaDataConsumer extends Logging {
628628
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
629629
val cacheKey = new CacheKey(topicPartition, kafkaParams)
630630

631+
logDebug(s"Task re-attempt detected, invalidating consumers with key $cacheKey")
632+
631633
// If this is reattempt at running the task, then invalidate cached consumer if any.
632634
consumerPool.invalidateKey(cacheKey)
633635

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,27 @@ class CachedKafkaProducerSuite extends SharedSparkSession with PrivateMethodTest
9797
assert(producerPool.size(toCacheKey(kafkaParams2)) === 1)
9898
}
9999

100+
test("acquire should return a new instance with Task retry") {
101+
try {
102+
val kafkaParams = getKafkaParams()
103+
104+
val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
105+
TaskContext.setTaskContext(context1)
106+
val producer1 = CachedKafkaProducer.acquire(kafkaParams)
107+
CachedKafkaProducer.release(producer1)
108+
109+
val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
110+
TaskContext.setTaskContext(context2)
111+
val producer2 = CachedKafkaProducer.acquire(kafkaParams)
112+
CachedKafkaProducer.release(producer2)
113+
114+
assert(producer1 !== producer2)
115+
assert(producerPool.size(toCacheKey(kafkaParams)) === 1)
116+
} finally {
117+
TaskContext.unset()
118+
}
119+
}
120+
100121
test("Concurrent use of CachedKafkaProducer") {
101122
val data = (1 to 1000).map(_.toString)
102123
testUtils.createTopic(topic, 1)

0 commit comments

Comments
 (0)