Skip to content

Commit 76913e2

Browse files
committed
Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
1 parent f1069b8 commit 76913e2

File tree

2 files changed

+127
-1
lines changed

2 files changed

+127
-1
lines changed

external/kafka/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>org.apache.kafka</groupId>
4646
<artifactId>kafka_${scala.binary.version}</artifactId>
47-
<version>0.8.0</version>
47+
<version>0.8.1.1</version>
4848
<exclusions>
4949
<exclusion>
5050
<groupId>com.sun.jmx</groupId>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd.kafka
19+
20+
import scala.reflect.{classTag, ClassTag}
21+
22+
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
23+
import org.apache.spark.rdd.RDD
24+
import org.apache.spark.util.NextIterator
25+
26+
import java.util.Properties
27+
import kafka.api.FetchRequestBuilder
28+
import kafka.common.{ErrorMapping, TopicAndPartition}
29+
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
30+
import kafka.message.{MessageAndMetadata, MessageAndOffset}
31+
import kafka.serializer.Decoder
32+
import kafka.utils.VerifiableProperties
33+
34+
private[spark] case class KafkaRDDPartition(
35+
override val index: Int,
36+
topic: String,
37+
partition: Int,
38+
fromOffset: Long,
39+
untilOffset: Long
40+
) extends Partition
41+
42+
/** A batch-oriented interface to Kafka.
43+
* Each given Kafka topic/partition corresponds to an RDD partition.
44+
* Starting and ending offsets are specified in advance, so that you can control exactly-once semantics.
45+
* For an easy interface to Kafka-managed offsets, see {@link org.apache.spark.rdd.kafka.KafkaCluster}
46+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>.
47+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
48+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
49+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) starting point of the batch
50+
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) ending point of the batch
51+
* @param messageHandler function for translating each message into the desired type
52+
*/
53+
class KafkaRDD[
54+
K: ClassTag,
55+
V: ClassTag,
56+
U <: Decoder[_]: ClassTag,
57+
T <: Decoder[_]: ClassTag,
58+
R: ClassTag](
59+
sc: SparkContext,
60+
kafkaParams: Map[String, String],
61+
fromOffsets: Map[TopicAndPartition, Long],
62+
untilOffsets: Map[TopicAndPartition, Long],
63+
messageHandler: MessageAndMetadata[K, V] => R
64+
) extends RDD[R](sc, Nil) with Logging {
65+
66+
assert(fromOffsets.keys == untilOffsets.keys,
67+
"Must provide both from and until offsets for each topic/partition")
68+
69+
override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi =>
70+
val ((tp, from), index) = kvi
71+
new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp))
72+
}.toArray
73+
74+
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[R] {
75+
context.addTaskCompletionListener{ context => closeIfNeeded() }
76+
77+
val part = thePart.asInstanceOf[KafkaRDDPartition]
78+
val props = new Properties()
79+
kafkaParams.foreach(param => props.put(param._1, param._2))
80+
val fetchSize = Option(props.getProperty("fetch.message.max.bytes")).map(_.toInt).getOrElse(1024*1024)
81+
val leaderBackoff = Option(props.getProperty("refresh.leader.backoff.ms")).map(_.toLong).getOrElse(200L)
82+
val consumerConfig = new ConsumerConfig(props)
83+
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
84+
.newInstance(consumerConfig.props)
85+
.asInstanceOf[Decoder[K]]
86+
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
87+
.newInstance(consumerConfig.props)
88+
.asInstanceOf[Decoder[V]]
89+
val consumer: SimpleConsumer = ???
90+
var requestOffset = part.fromOffset
91+
var iter: Iterator[MessageAndOffset] = null
92+
93+
override def getNext: R = {
94+
if (iter == null || !iter.hasNext) {
95+
val req = new FetchRequestBuilder().
96+
addFetch(part.topic, part.partition, requestOffset, fetchSize).
97+
build()
98+
val resp = consumer.fetch(req)
99+
if (resp.hasError) {
100+
val err = resp.errorCode(part.topic, part.partition)
101+
if (err == ErrorMapping.LeaderNotAvailableCode ||
102+
err == ErrorMapping.NotLeaderForPartitionCode) {
103+
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, sleeping for ${leaderBackoff}ms")
104+
Thread.sleep(leaderBackoff)
105+
}
106+
// Let normal rdd retry sort out reconnect attempts
107+
throw ErrorMapping.exceptionFor(err)
108+
}
109+
iter = resp.messageSet(part.topic, part.partition)
110+
.iterator
111+
.filter(_.offset < requestOffset)
112+
}
113+
val item = iter.next
114+
if (item.offset >= part.untilOffset) {
115+
finished = true
116+
null.asInstanceOf[R]
117+
} else {
118+
requestOffset = item.nextOffset
119+
messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
120+
}
121+
}
122+
123+
override def close() = consumer.close()
124+
}
125+
126+
}

0 commit comments

Comments
 (0)