Skip to content

Commit 319ccd5

Browse files
HeartSaVioRsrowen
authored andcommitted
[SPARK-30336][SQL][SS] Move Kafka consumer-related classes to its own package
### What changes were proposed in this pull request? There're too many classes placed in a single package "org.apache.spark.sql.kafka010" which classes can be grouped by purpose. As a part of change in SPARK-21869 (#26845), we moved out producer related classes to "org.apache.spark.sql.kafka010.producer" and only expose necessary classes/methods to the outside of package. This patch applies the same to consumer related classes. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs. Closes #26991 from HeartSaVioR/SPARK-30336. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 23a49af commit 319ccd5

File tree

9 files changed

+37
-32
lines changed

9 files changed

+37
-32
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2525
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
26+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
2627

2728
/** A [[InputPartition]] for reading Kafka data in a batch based streaming query. */
2829
private[kafka010] case class KafkaBatchInputPartition(

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3030
import org.apache.spark.sql.connector.read.InputPartition
3131
import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset}
3232
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
33+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
3334
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3435

3536
/**

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,15 @@ package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
2121

22-
import scala.collection.mutable.ArrayBuffer
23-
24-
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
22+
import org.apache.kafka.clients.consumer.ConsumerRecord
2523
import org.apache.kafka.common.TopicPartition
2624

2725
import org.apache.spark.{Partition, SparkContext, TaskContext}
28-
import org.apache.spark.partial.{BoundedDouble, PartialResult}
2926
import org.apache.spark.rdd.RDD
27+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
3028
import org.apache.spark.storage.StorageLevel
3129
import org.apache.spark.util.NextIterator
3230

33-
3431
/** Offset range that one partition of the KafkaSourceRDD has to read */
3532
private[kafka010] case class KafkaSourceRDDOffsetRange(
3633
topicPartition: TopicPartition,

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala renamed to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
@@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
2727

2828
import org.apache.spark.SparkConf
2929
import org.apache.spark.internal.Logging
30-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
30+
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
31+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
3132
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
3233

3334
/**
@@ -39,7 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
3940
* modified in same instance, this class cannot be replaced with general pool implementations
4041
* including Apache Commons Pool which pools KafkaConsumer.
4142
*/
42-
private[kafka010] class FetchedDataPool(
43+
private[consumer] class FetchedDataPool(
4344
executorService: ScheduledExecutorService,
4445
clock: Clock,
4546
conf: SparkConf) extends Logging {
@@ -159,8 +160,8 @@ private[kafka010] class FetchedDataPool(
159160
}
160161
}
161162

162-
private[kafka010] object FetchedDataPool {
163-
private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) {
163+
private[consumer] object FetchedDataPool {
164+
private[consumer] case class CachedFetchedData(fetchedData: FetchedData) {
164165
var lastReleasedTimestamp: Long = Long.MaxValue
165166
var lastAcquiredTimestamp: Long = Long.MinValue
166167
var inUse: Boolean = false
@@ -179,5 +180,5 @@ private[kafka010] object FetchedDataPool {
179180
}
180181
}
181182

182-
private[kafka010] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
183+
private[consumer] type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData]
183184
}
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121
import java.util.concurrent.ConcurrentHashMap
@@ -25,8 +25,9 @@ import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject
2525

2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.internal.Logging
28-
import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
29-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
28+
import org.apache.spark.sql.kafka010._
29+
import org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool._
30+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
3031

3132
/**
3233
* Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]].
@@ -45,10 +46,9 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
4546
* not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]]
4647
* unless caller shares the object to multiple threads.
4748
*/
48-
private[kafka010] class InternalKafkaConsumerPool(
49+
private[consumer] class InternalKafkaConsumerPool(
4950
objectFactory: ObjectFactory,
5051
poolConfig: PoolConfig) extends Logging {
51-
5252
def this(conf: SparkConf) = {
5353
this(new ObjectFactory, new PoolConfig(conf))
5454
}
@@ -147,7 +147,7 @@ private[kafka010] class InternalKafkaConsumerPool(
147147
}
148148
}
149149

150-
private[kafka010] object InternalKafkaConsumerPool {
150+
private[consumer] object InternalKafkaConsumerPool {
151151
object CustomSwallowedExceptionListener extends SwallowedExceptionListener with Logging {
152152
override def onSwallowException(e: Exception): Unit = {
153153
logError(s"Error closing Kafka consumer", e)
@@ -218,4 +218,3 @@ private[kafka010] object InternalKafkaConsumerPool {
218218
}
219219
}
220220
}
221-

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala renamed to external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121
import java.io.Closeable
@@ -29,9 +29,9 @@ import org.apache.kafka.common.TopicPartition
2929

3030
import org.apache.spark.{SparkEnv, TaskContext}
3131
import org.apache.spark.internal.Logging
32-
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenClusterConf, KafkaTokenUtil}
33-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
32+
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
3433
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
34+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
3535
import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
3636

3737
/**
@@ -47,13 +47,15 @@ private[kafka010] class InternalKafkaConsumer(
4747

4848
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
4949

50-
private[kafka010] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(
50+
// Exposed for testing
51+
private[consumer] val clusterConfig = KafkaTokenUtil.findMatchingTokenClusterConfig(
5152
SparkEnv.get.conf, kafkaParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
5253
.asInstanceOf[String])
5354

5455
// Kafka consumer is not able to give back the params instantiated with so we need to store it.
5556
// It must be updated whenever a new consumer is created.
56-
private[kafka010] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
57+
// Exposed for testing
58+
private[consumer] var kafkaParamsWithSecurity: ju.Map[String, Object] = _
5759
private val consumer = createConsumer()
5860

5961
/**
@@ -139,7 +141,7 @@ private[kafka010] class InternalKafkaConsumer(
139141
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
140142
* poll when `records` is drained.
141143
*/
142-
private[kafka010] case class FetchedData(
144+
private[consumer] case class FetchedData(
143145
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
144146
private var _nextOffsetInFetchedData: Long,
145147
private var _offsetAfterPoll: Long) {
@@ -196,7 +198,7 @@ private[kafka010] case class FetchedData(
196198
* `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch
197199
* instead.
198200
*/
199-
private[kafka010] case class FetchedRecord(
201+
private[consumer] case class FetchedRecord(
200202
var record: ConsumerRecord[Array[Byte], Array[Byte]],
201203
var nextOffsetToFetch: Long) {
202204

@@ -223,7 +225,8 @@ private[kafka010] class KafkaDataConsumer(
223225
fetchedDataPool: FetchedDataPool) extends Logging {
224226
import KafkaDataConsumer._
225227

226-
@volatile private[kafka010] var _consumer: Option[InternalKafkaConsumer] = None
228+
// Exposed for testing
229+
@volatile private[consumer] var _consumer: Option[InternalKafkaConsumer] = None
227230
@volatile private var _fetchedData: Option[FetchedData] = None
228231

229232
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala renamed to external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121
import java.util.concurrent.TimeUnit
@@ -29,7 +29,8 @@ import org.jmock.lib.concurrent.DeterministicScheduler
2929
import org.scalatest.PrivateMethodTester
3030

3131
import org.apache.spark.SparkConf
32-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
32+
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
33+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
3334
import org.apache.spark.sql.test.SharedSparkSession
3435
import org.apache.spark.util.ManualClock
3536

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121

@@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition
2626
import org.apache.kafka.common.serialization.ByteArrayDeserializer
2727

2828
import org.apache.spark.SparkConf
29-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
29+
import org.apache.spark.sql.kafka010.{CONSUMER_CACHE_CAPACITY, CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL, CONSUMER_CACHE_TIMEOUT}
30+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
3031
import org.apache.spark.sql.test.SharedSparkSession
3132

3233
class InternalKafkaConsumerPoolSuite extends SharedSparkSession {
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.kafka010
18+
package org.apache.spark.sql.kafka010.consumer
1919

2020
import java.{util => ju}
2121
import java.nio.charset.StandardCharsets
@@ -32,7 +32,8 @@ import org.scalatest.PrivateMethodTester
3232

3333
import org.apache.spark.{TaskContext, TaskContextImpl}
3434
import org.apache.spark.kafka010.KafkaDelegationTokenTest
35-
import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
35+
import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
36+
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
3637
import org.apache.spark.sql.test.SharedSparkSession
3738

3839
class KafkaDataConsumerSuite

0 commit comments

Comments
 (0)