Skip to content

Commit 1d70625

Browse files
committed
WIP on kafka cluster
1 parent 76913e2 commit 1d70625

File tree

2 files changed

+98
-12
lines changed

2 files changed

+98
-12
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd.kafka
19+
20+
import scala.util.control.NonFatal
21+
import java.util.Properties
22+
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
23+
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
24+
25+
/**
26+
* Convenience methods for interacting with a Kafka cluster.
27+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>.
28+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
29+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
30+
*/
31+
class KafkaCluster(val kafkaParams: Map[String, String]) {
32+
val brokers: Array[(String, Int)] =
33+
kafkaParams.get("metadata.broker.list")
34+
.orElse(kafkaParams.get("bootstrap.servers"))
35+
.getOrElse(throw new Exception("Must specify metadata.broker.list or bootstrap.servers"))
36+
.split(",").map { hp =>
37+
val hpa = hp.split(":")
38+
(hpa(0), hpa(1).toInt)
39+
}
40+
41+
val config: ConsumerConfig = KafkaCluster.consumerConfig(kafkaParams)
42+
43+
def connect(host: String, port: Int): SimpleConsumer =
44+
new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)
45+
46+
def connect(hostAndPort: (String, Int)): SimpleConsumer =
47+
connect(hostAndPort._1, hostAndPort._2)
48+
49+
def connectLeader(topic: String, partition: Int): Option[SimpleConsumer] =
50+
findLeader(topic, partition).map(connect)
51+
52+
def findLeader(topic: String, partition: Int): Option[(String, Int)] = {
53+
brokers.foreach { hp =>
54+
var consumer: SimpleConsumer = null
55+
try {
56+
consumer = connect(hp)
57+
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, Seq(topic))
58+
val resp: TopicMetadataResponse = consumer.send(req)
59+
resp.topicsMetadata.find(_.topic == topic).flatMap { t =>
60+
t.partitionsMetadata.find(_.partitionId == partition)
61+
}.foreach { partitionMeta =>
62+
partitionMeta.leader.foreach { leader =>
63+
return Some((leader.host, leader.port))
64+
}
65+
}
66+
} catch {
67+
case NonFatal(e) =>
68+
} finally {
69+
if (consumer != null) consumer.close()
70+
}
71+
}
72+
None
73+
}
74+
}
75+
76+
object KafkaCluster {
77+
/** Make a consumer config without requiring group.id or zookeeper.connect,
78+
* since communicating with brokers also needs common settings such as timeout
79+
*/
80+
def consumerConfig(kafkaParams: Map[String, String]): ConsumerConfig = {
81+
val props = new Properties()
82+
kafkaParams.foreach(param => props.put(param._1, param._2))
83+
Seq("zookeeper.connect", "group.id").foreach { s =>
84+
if (!props.contains(s))
85+
props.setProperty(s, "")
86+
}
87+
new ConsumerConfig(props)
88+
}
89+
}

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] case class KafkaRDDPartition(
3939
untilOffset: Long
4040
) extends Partition
4141

42-
/** A batch-oriented interface to Kafka.
42+
/** A batch-oriented interface for consuming from Kafka.
4343
* Each given Kafka topic/partition corresponds to an RDD partition.
4444
* Starting and ending offsets are specified in advance, so that you can control exactly-once semantics.
4545
* For an easy interface to Kafka-managed offsets, see {@link org.apache.spark.rdd.kafka.KafkaCluster}
@@ -74,34 +74,31 @@ class KafkaRDD[
7474
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[R] {
7575
context.addTaskCompletionListener{ context => closeIfNeeded() }
7676

77+
val kc = new KafkaCluster(kafkaParams)
7778
val part = thePart.asInstanceOf[KafkaRDDPartition]
78-
val props = new Properties()
79-
kafkaParams.foreach(param => props.put(param._1, param._2))
80-
val fetchSize = Option(props.getProperty("fetch.message.max.bytes")).map(_.toInt).getOrElse(1024*1024)
81-
val leaderBackoff = Option(props.getProperty("refresh.leader.backoff.ms")).map(_.toLong).getOrElse(200L)
82-
val consumerConfig = new ConsumerConfig(props)
8379
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
84-
.newInstance(consumerConfig.props)
80+
.newInstance(kc.config.props)
8581
.asInstanceOf[Decoder[K]]
8682
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
87-
.newInstance(consumerConfig.props)
83+
.newInstance(kc.config.props)
8884
.asInstanceOf[Decoder[V]]
89-
val consumer: SimpleConsumer = ???
85+
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition)
86+
.getOrElse(throw new Exception(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}"))
9087
var requestOffset = part.fromOffset
9188
var iter: Iterator[MessageAndOffset] = null
9289

9390
override def getNext: R = {
9491
if (iter == null || !iter.hasNext) {
9592
val req = new FetchRequestBuilder().
96-
addFetch(part.topic, part.partition, requestOffset, fetchSize).
93+
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
9794
build()
9895
val resp = consumer.fetch(req)
9996
if (resp.hasError) {
10097
val err = resp.errorCode(part.topic, part.partition)
10198
if (err == ErrorMapping.LeaderNotAvailableCode ||
10299
err == ErrorMapping.NotLeaderForPartitionCode) {
103-
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${leaderBackoff}ms")
104-
Thread.sleep(leaderBackoff)
100+
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
101+
Thread.sleep(kc.config.refreshLeaderBackoffMs)
105102
}
106103
// Let normal rdd retry sort out reconnect attempts
107104
throw ErrorMapping.exceptionFor(err)

0 commit comments

Comments
 (0)