Skip to content

Commit adf99a6

Browse files
committed
[SPARK-4964] fix serialization issues for checkpointing
1 parent 1d50749 commit adf99a6

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
3131
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
3232
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
3333
*/
34-
class KafkaCluster(val kafkaParams: Map[String, String]) {
34+
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
3535
import KafkaCluster.Err
3636

3737
val seedBrokers: Array[(String, Int)] =
@@ -43,7 +43,15 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
4343
(hpa(0), hpa(1).toInt)
4444
}
4545

46-
val config: ConsumerConfig = KafkaCluster.consumerConfig(kafkaParams)
46+
// ConsumerConfig isn't serializable
47+
@transient private var _config: ConsumerConfig = null
48+
49+
def config: ConsumerConfig = this.synchronized {
50+
if (_config == null) {
51+
_config = KafkaCluster.consumerConfig(kafkaParams)
52+
}
53+
_config
54+
}
4755

4856
def connect(host: String, port: Int): SimpleConsumer =
4957
new SimpleConsumer(host, port, config.socketTimeoutMs,

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ class DeterministicKafkaInputDStream[
6464
private val kc = new KafkaCluster(kafkaParams)
6565

6666
private val maxMessagesPerPartition: Option[Long] = {
67-
val ratePerSec = ssc.sparkContext.getConf.getInt("spark.streaming.receiver.maxRate", 0)
67+
val ratePerSec = context.sparkContext.getConf.getInt("spark.streaming.receiver.maxRate", 0)
6868
if (ratePerSec > 0) {
69-
val secsPerBatch = ssc.graph.batchDuration.milliseconds.toDouble / 1000
69+
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
7070
Some((secsPerBatch * ratePerSec).toLong)
7171
} else {
7272
None
@@ -105,7 +105,7 @@ class DeterministicKafkaInputDStream[
105105
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
106106
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
107107
val rdd = new KafkaRDD[K, V, U, T, R](
108-
ssc_.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
108+
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
109109

110110
currentOffsets = untilOffsets
111111
Some(rdd)

0 commit comments

Comments
 (0)