Skip to content

Commit a3443ea

Browse files
committed
Addressed few code refactoring nits.
1 parent da71589 commit a3443ea

File tree

2 files changed

+13
-22
lines changed

2 files changed

+13
-22
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,8 @@ private[kafka010] case class CachedKafkaProducer(
3939

4040
private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava
4141

42-
private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) =
43-
KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap)
44-
.setAuthenticationConfigIfNeeded()
45-
.build()
46-
4742
lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
48-
val producer = new KafkaProducer[Array[Byte], Array[Byte]](updatedAuthConfigIfNeeded(configMap))
43+
val producer = new KafkaProducer[Array[Byte], Array[Byte]](configMap)
4944
logDebug(s"Created a new instance of KafkaProducer for " +
5045
s"$kafkaParams with Id: $id")
5146
closed = false
@@ -59,7 +54,7 @@ private[kafka010] case class CachedKafkaProducer(
5954
if (!closed) {
6055
closed = true
6156
kafkaProducer.close()
62-
logInfo(s"Closed kafka producer: $this")
57+
logDebug(s"Closed kafka producer: $this")
6358
}
6459
}
6560
} catch {
@@ -94,6 +89,11 @@ private[kafka010] object CachedKafkaProducer extends Logging {
9489
}
9590
}
9691

92+
private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) =
93+
KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap)
94+
.setAuthenticationConfigIfNeeded()
95+
.build()
96+
9797
private val closeQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]()
9898

9999
private val removalListener = new RemovalListener[Seq[(String, Object)], CachedKafkaProducer]() {
@@ -122,7 +122,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
122122
* one instance per specified kafkaParams.
123123
*/
124124
private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = {
125-
val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap)
125+
val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedAuthConfigIfNeeded(kafkaParamsMap))
126126
try {
127127
val producer = this.synchronized {
128128
val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq)
@@ -148,7 +148,6 @@ private[kafka010] object CachedKafkaProducer extends Logging {
148148
this.synchronized {
149149
// It should be ok to call release multiple times on the same producer object.
150150
if (producer.inUse()) {
151-
// So that we do not end up with -ve in-use counts.
152151
producer.inUseCount.decrementAndGet()
153152
logDebug(s"Released producer $producer.")
154153
} else {
@@ -165,11 +164,11 @@ private[kafka010] object CachedKafkaProducer extends Logging {
165164
}
166165
}
167166
// We need a close queue, so that we can close the producer(s) outside of a synchronized block.
168-
processPendingClose(producer)
167+
processPendingClose()
169168
}
170169

171170
/** Process pending closes. */
172-
private def processPendingClose(producer: CachedKafkaProducer): Unit = {
171+
private def processPendingClose(): Unit = {
173172
// Check and close any other producers previously evicted, but pending to be closed.
174173
for (p <- closeQueue.iterator().asScala) {
175174
if (!p.inUse()) {

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
4040
}
4141

4242
test("Should return the cached instance on calling acquire with same params.") {
43-
val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
43+
val kafkaParams = generateKafkaParams
4444
val producer = CachedKafkaProducer.acquire(kafkaParams)
4545
val producer2 = CachedKafkaProducer.acquire(kafkaParams)
4646
assert(producer.kafkaProducer == producer2.kafkaProducer)
@@ -50,7 +50,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
5050
}
5151

5252
test("Should return the new instance on calling acquire with different params.") {
53-
val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
53+
val kafkaParams = generateKafkaParams
5454
val producer = CachedKafkaProducer.acquire(kafkaParams)
5555
kafkaParams.remove("acks") // mutate the kafka params.
5656
val producer2 = CachedKafkaProducer.acquire(kafkaParams)
@@ -89,7 +89,7 @@ class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
8989
}
9090

9191
test("Should not close a producer in-use.") {
92-
val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
92+
val kafkaParams = generateKafkaParams
9393
val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(kafkaParams)
9494
producer.kafkaProducer // initializing the producer.
9595
assert(producer.getInUseCount == 1)
@@ -113,14 +113,6 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest
113113

114114
override val brokerProps = Map("auto.create.topics.enable" -> "false")
115115

116-
override def afterAll(): Unit = {
117-
if (testUtils != null) {
118-
testUtils.teardown()
119-
testUtils = null
120-
}
121-
super.afterAll()
122-
}
123-
124116
override def sparkConf: SparkConf = {
125117
val conf = super.sparkConf
126118
conf.set("spark.kafka.producer.cache.timeout", "2ms")

0 commit comments

Comments
 (0)