Skip to content

Commit 1bcf82a

Browse files
committed
review comments and discussion.
1 parent 114e8b4 commit 1bcf82a

File tree

2 files changed

+17
-18
lines changed

2 files changed

+17
-18
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010
2020
import java.{util => ju}
2121
import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit}
2222
import java.util.concurrent.atomic.AtomicInteger
23+
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.collection.JavaConverters._
2526
import scala.util.control.NonFatal
@@ -52,6 +53,7 @@ private[kafka010] case class CachedKafkaProducer(
5253
}
5354
@volatile
5455
private var isCached: Boolean = true
56+
@GuardedBy("this")
5557
private var closed: Boolean = true
5658
private def close(): Unit = {
5759
try {
@@ -184,7 +186,7 @@ private[kafka010] object CachedKafkaProducer extends Logging {
184186
}
185187
}
186188

187-
// Intended for testing purpose only.
189+
// For testing only.
188190
private[kafka010] def clear(): Unit = {
189191
logInfo("Cleaning up guava cache and force closing all kafka producers.")
190192
guavaCache.invalidateAll()
@@ -194,10 +196,12 @@ private[kafka010] object CachedKafkaProducer extends Logging {
194196
closeQueue.clear()
195197
}
196198

199+
// For testing only.
197200
private[kafka010] def evict(params: Seq[(String, Object)]): Unit = {
198201
guavaCache.invalidate(params)
199202
}
200203

204+
// For testing only.
201205
private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] =
202206
guavaCache.asMap()
203207
}

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21-
import java.util.concurrent.{ConcurrentLinkedQueue, Executors, TimeUnit}
21+
import java.util.concurrent.{Executors, TimeUnit}
2222

2323
import scala.collection.mutable
2424
import scala.util.Random
2525

26+
import org.apache.kafka.clients.producer.ProducerRecord
2627
import org.apache.kafka.common.serialization.ByteArraySerializer
2728

2829
import org.apache.spark.{SparkConf, SparkException}
@@ -141,13 +142,12 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest
141142
(1 to numConcurrentProducers).map {
142143
i => kafkaParamsUniqueMap.put(i, kafkaParams.updated("retries", s"$i").asJava)
143144
}
144-
val toBeReleasedQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]()
145145

146-
def acquire(i: Int): Unit = {
146+
def acquire(i: Int): CachedKafkaProducer = {
147147
val producer = CachedKafkaProducer.acquire(kafkaParamsUniqueMap(i))
148148
producer.kafkaProducer // materialize producer for the first time.
149149
assert(!producer.isClosed, "Acquired producer cannot be closed.")
150-
toBeReleasedQueue.add(producer)
150+
producer
151151
}
152152

153153
def release(producer: CachedKafkaProducer): Unit = {
@@ -158,28 +158,23 @@ class CachedKafkaProducerStressSuite extends KafkaContinuousTest with KafkaTest
158158
}
159159
}
160160
}
161+
val data = (1 to 100).map(_.toString)
162+
161163
val threadPool = Executors.newFixedThreadPool(numThreads)
162164
try {
163165
val futuresAcquire = (1 to 10 * numConcurrentProducers).map { i =>
164166
threadPool.submit(new Runnable {
165167
override def run(): Unit = {
166-
acquire(i % numConcurrentProducers + 1)
168+
val producer = acquire(i % numConcurrentProducers + 1)
169+
data.foreach { d =>
170+
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, d.getBytes)
171+
producer.kafkaProducer.send(record)
172+
}
173+
release(producer)
167174
}
168175
})
169176
}
170-
val futuresRelease = (1 to 10 * numConcurrentProducers).map { i =>
171-
val cachedKafkaProducer = toBeReleasedQueue.poll()
172-
// 2x release should not corrupt the state of cache.
173-
(1 to 2).map { j =>
174-
threadPool.submit(new Runnable {
175-
override def run(): Unit = {
176-
release(cachedKafkaProducer)
177-
}
178-
})
179-
}
180-
}
181177
futuresAcquire.foreach(_.get(1, TimeUnit.MINUTES))
182-
futuresRelease.flatten.foreach(_.get(1, TimeUnit.MINUTES))
183178
} finally {
184179
threadPool.shutdown()
185180
CachedKafkaProducer.clear()

0 commit comments

Comments
 (0)