@@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets
2424import scala .collection .JavaConverters ._
2525
2626import org .apache .commons .io .IOUtils
27- import org .apache .kafka .common .TopicPartition
2827
2928import org .apache .spark .SparkEnv
3029import org .apache .spark .internal .Logging
@@ -64,8 +63,6 @@ private[kafka010] class KafkaMicroBatchReader(
6463 failOnDataLoss : Boolean )
6564 extends MicroBatchReader with SupportsScanUnsafeRow with Logging {
6665
67- type PartitionOffsetMap = Map [TopicPartition , Long ]
68-
6966 private var startPartitionOffsets : PartitionOffsetMap = _
7067 private var endPartitionOffsets : PartitionOffsetMap = _
7168
@@ -76,6 +73,7 @@ private[kafka010] class KafkaMicroBatchReader(
7673 private val maxOffsetsPerTrigger =
7774 Option (options.get(" maxOffsetsPerTrigger" ).orElse(null )).map(_.toLong)
7875
76+ private val rangeCalculator = KafkaOffsetRangeCalculator (options)
7977 /**
8078 * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
8179 * called in StreamExecutionThread. Otherwise, interrupting a thread while running
@@ -106,15 +104,15 @@ private[kafka010] class KafkaMicroBatchReader(
106104 override def createUnsafeRowReaderFactories (): ju.List [DataReaderFactory [UnsafeRow ]] = {
107105 // Find the new partitions, and get their earliest offsets
108106 val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
109- val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
110- if (newPartitionOffsets .keySet != newPartitions) {
107+ val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
108+ if (newPartitionInitialOffsets .keySet != newPartitions) {
111109 // We cannot get from offsets for some partitions. It means they got deleted.
112- val deletedPartitions = newPartitions.diff(newPartitionOffsets .keySet)
110+ val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets .keySet)
113111 reportDataLoss(
114112 s " Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed " )
115113 }
116- logInfo(s " Partitions added: $newPartitionOffsets " )
117- newPartitionOffsets .filter(_._2 != 0 ).foreach { case (p, o) =>
114+ logInfo(s " Partitions added: $newPartitionInitialOffsets " )
115+ newPartitionInitialOffsets .filter(_._2 != 0 ).foreach { case (p, o) =>
118116 reportDataLoss(
119117 s " Added partition $p starts from $o instead of 0. Some data may have been missed " )
120118 }
@@ -125,46 +123,28 @@ private[kafka010] class KafkaMicroBatchReader(
125123 reportDataLoss(s " $deletedPartitions are gone. Some data may have been missed " )
126124 }
127125
128- // Use the until partitions to calculate offset ranges to ignore partitions that have
126+ // Use the end partitions to calculate offset ranges to ignore partitions that have
129127 // been deleted
130128 val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
131129 // Ignore partitions that we don't know the from offsets.
132- newPartitionOffsets .contains(tp) || startPartitionOffsets.contains(tp)
130+ newPartitionInitialOffsets .contains(tp) || startPartitionOffsets.contains(tp)
133131 }.toSeq
134132 logDebug(" TopicPartitions: " + topicPartitions.mkString(" , " ))
135133
136- val sortedExecutors = getSortedExecutorList()
137- val numExecutors = sortedExecutors.length
138- logDebug(" Sorted executors: " + sortedExecutors.mkString(" , " ))
139-
140134 // Calculate offset ranges
141- val factories = topicPartitions.flatMap { tp =>
142- val fromOffset = startPartitionOffsets.get(tp).getOrElse {
143- newPartitionOffsets.getOrElse(
144- tp, {
145- // This should not happen since newPartitionOffsets contains all partitions not in
146- // fromPartitionOffsets
147- throw new IllegalStateException (s " $tp doesn't have a from offset " )
148- })
149- }
150- val untilOffset = endPartitionOffsets(tp)
151-
152- if (untilOffset >= fromOffset) {
153- // This allows cached KafkaConsumers in the executors to be re-used to read the same
154- // partition in every batch.
155- val preferredLoc = if (numExecutors > 0 ) {
156- Some (sortedExecutors(Math .floorMod(tp.hashCode, numExecutors)))
157- } else None
158- val range = KafkaOffsetRange (tp, fromOffset, untilOffset)
159- Some (
160- new KafkaMicroBatchDataReaderFactory (
161- range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss))
162- } else {
163- reportDataLoss(
164- s " Partition $tp's offset was changed from " +
165- s " $fromOffset to $untilOffset, some data may have been missed " )
166- None
167- }
135+ val offsetRanges = rangeCalculator.getRanges(
136+ fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
137+ untilOffsets = endPartitionOffsets,
138+ executorLocations = getSortedExecutorList())
139+
140+ // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
141+ // that is, concurrent tasks will not read the same TopicPartitions.
142+ val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size
143+
144+ // Generate factories based on the offset ranges
145+ val factories = offsetRanges.map { range =>
146+ new KafkaMicroBatchDataReaderFactory (
147+ range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
168148 }
169149 factories.map(_.asInstanceOf [DataReaderFactory [UnsafeRow ]]).asJava
170150 }
@@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
320300}
321301
322302/** A [[DataReaderFactory ]] for reading Kafka data in a micro-batch streaming query. */
323- private [kafka010] class KafkaMicroBatchDataReaderFactory (
324- range : KafkaOffsetRange ,
325- preferredLoc : Option [String ],
303+ private [kafka010] case class KafkaMicroBatchDataReaderFactory (
304+ offsetRange : KafkaOffsetRange ,
326305 executorKafkaParams : ju.Map [String , Object ],
327306 pollTimeoutMs : Long ,
328- failOnDataLoss : Boolean ) extends DataReaderFactory [UnsafeRow ] {
307+ failOnDataLoss : Boolean ,
308+ reuseKafkaConsumer : Boolean ) extends DataReaderFactory [UnsafeRow ] {
329309
330- override def preferredLocations (): Array [String ] = preferredLoc.toArray
310+ override def preferredLocations (): Array [String ] = offsetRange. preferredLoc.toArray
331311
332312 override def createDataReader (): DataReader [UnsafeRow ] = new KafkaMicroBatchDataReader (
333- range , executorKafkaParams, pollTimeoutMs, failOnDataLoss)
313+ offsetRange , executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer )
334314}
335315
336316/** A [[DataReader ]] for reading Kafka data in a micro-batch streaming query. */
337- private [kafka010] class KafkaMicroBatchDataReader (
317+ private [kafka010] case class KafkaMicroBatchDataReader (
338318 offsetRange : KafkaOffsetRange ,
339319 executorKafkaParams : ju.Map [String , Object ],
340320 pollTimeoutMs : Long ,
341- failOnDataLoss : Boolean ) extends DataReader [UnsafeRow ] with Logging {
321+ failOnDataLoss : Boolean ,
322+ reuseKafkaConsumer : Boolean ) extends DataReader [UnsafeRow ] with Logging {
323+
324+ private val consumer = {
325+ if (! reuseKafkaConsumer) {
326+ // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We
327+ // uses `assign` here, hence we don't need to worry about the "group.id" conflicts.
328+ CachedKafkaConsumer .createUncached(
329+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
330+ } else {
331+ CachedKafkaConsumer .getOrCreate(
332+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
333+ }
334+ }
342335
343- private val consumer = CachedKafkaConsumer .getOrCreate(
344- offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
345336 private val rangeToRead = resolveRange(offsetRange)
346337 private val converter = new KafkaRecordToUnsafeRowConverter
347338
@@ -369,9 +360,14 @@ private[kafka010] class KafkaMicroBatchDataReader(
369360 }
370361
371362 override def close (): Unit = {
372- // Indicate that we're no longer using this consumer
373- CachedKafkaConsumer .releaseKafkaConsumer(
374- offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
363+ if (! reuseKafkaConsumer) {
364+ // Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
365+ consumer.close()
366+ } else {
367+ // Indicate that we're no longer using this consumer
368+ CachedKafkaConsumer .releaseKafkaConsumer(
369+ offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
370+ }
375371 }
376372
377373 private def resolveRange (range : KafkaOffsetRange ): KafkaOffsetRange = {
@@ -392,12 +388,9 @@ private[kafka010] class KafkaMicroBatchDataReader(
392388 } else {
393389 range.untilOffset
394390 }
395- KafkaOffsetRange (range.topicPartition, fromOffset, untilOffset)
391+ KafkaOffsetRange (range.topicPartition, fromOffset, untilOffset, None )
396392 } else {
397393 range
398394 }
399395 }
400396}
401-
402- private [kafka010] case class KafkaOffsetRange (
403- topicPartition : TopicPartition , fromOffset : Long , untilOffset : Long )
0 commit comments