Skip to content

Commit 2e67117

Browse files
committed
[SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing)
1 parent bb80bbe commit 2e67117

File tree

6 files changed

+210
-15
lines changed

6 files changed

+210
-15
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
3232
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
3333
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
3434
*/
35+
private[spark]
3536
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
3637
import KafkaCluster.{Err, LeaderOffset}
3738

@@ -297,6 +298,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
297298
}
298299
}
299300

301+
private[spark]
300302
object KafkaCluster {
301303
type Err = ArrayBuffer[Throwable]
302304

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ import kafka.utils.VerifiableProperties
3434
/** A batch-oriented interface for consuming from Kafka.
3535
* Starting and ending offsets are specified in advance,
3636
* so that you can control exactly-once semantics.
37-
* For an easy interface to Kafka-managed offsets,
38-
* see {@link org.apache.spark.rdd.kafka.KafkaCluster}
3937
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
4038
* configuration parameters</a>.
4139
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
@@ -44,17 +42,20 @@ import kafka.utils.VerifiableProperties
4442
* range of offsets for a given Kafka topic/partition
4543
* @param messageHandler function for translating each message into the desired type
4644
*/
45+
private[spark]
4746
class KafkaRDD[
4847
K: ClassTag,
4948
V: ClassTag,
5049
U <: Decoder[_]: ClassTag,
5150
T <: Decoder[_]: ClassTag,
52-
R: ClassTag](
51+
R: ClassTag] private[spark] (
5352
sc: SparkContext,
54-
val kafkaParams: Map[String, String],
55-
val batch: Array[KafkaRDDPartition],
53+
kafkaParams: Map[String, String],
54+
private[spark] val batch: Array[KafkaRDDPartition],
5655
messageHandler: MessageAndMetadata[K, V] => R
57-
) extends RDD[R](sc, Nil) with Logging {
56+
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
57+
58+
def offsetRanges: Array[OffsetRange] = batch.asInstanceOf[Array[OffsetRange]]
5859

5960
override def getPartitions: Array[Partition] = batch.asInstanceOf[Array[Partition]]
6061

@@ -160,6 +161,7 @@ class KafkaRDD[
160161

161162
}
162163

164+
private[spark]
163165
object KafkaRDD {
164166
import KafkaCluster.LeaderOffset
165167

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDDPartition.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@ import org.apache.spark.Partition
2626
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
2727
* @param port preferred kafka host's port
2828
*/
29+
private[spark]
2930
class KafkaRDDPartition(
3031
override val index: Int,
31-
val topic: String,
32-
val partition: Int,
33-
val fromOffset: Long,
34-
val untilOffset: Long,
35-
val host: String,
36-
val port: Int
37-
) extends Partition {
32+
override val topic: String,
33+
override val partition: Int,
34+
override val fromOffset: Long,
35+
override val untilOffset: Long,
36+
override val host: String,
37+
override val port: Int
38+
) extends Partition with OffsetRange {
3839
def toTuple: (Int, String, Int, Long, Long, String, Int) = (
3940
index,
4041
topic,
@@ -47,6 +48,7 @@ class KafkaRDDPartition(
4748

4849
}
4950

51+
private[spark]
5052
object KafkaRDDPartition {
5153
def apply(
5254
index: Int,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd.kafka
19+
20+
/** Represents a range of offsets from a single Kafka TopicAndPartition */
21+
trait OffsetRange {
22+
/** kafka topic name */
23+
def topic: String
24+
25+
/** kafka partition id */
26+
def partition: Int
27+
28+
/** inclusive starting offset */
29+
def fromOffset: Long
30+
31+
/** exclusive ending offset */
32+
def untilOffset: Long
33+
34+
/** preferred kafka host, i.e. the leader at the time of creation */
35+
def host: String
36+
37+
/** preferred kafka host's port */
38+
def port: Int
39+
}
40+
41+
/** Something that has a collection of OffsetRanges */
42+
trait HasOffsetRanges {
43+
def offsetRanges: Array[OffsetRange]
44+
}
45+
46+
private class OffsetRangeImpl(
47+
override val topic: String,
48+
override val partition: Int,
49+
override val fromOffset: Long,
50+
override val untilOffset: Long,
51+
override val host: String,
52+
override val port: Int
53+
) extends OffsetRange
54+
55+
object OffsetRange {
56+
def apply(
57+
topic: String,
58+
partition: Int,
59+
fromOffset: Long,
60+
untilOffset: Long,
61+
host: String,
62+
port: Int): OffsetRange =
63+
new OffsetRangeImpl(
64+
topic,
65+
partition,
66+
fromOffset,
67+
untilOffset,
68+
host,
69+
port)
70+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import org.apache.spark.streaming.dstream._
5151
* @param messageHandler function for translating each message into the desired type
5252
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
5353
*/
54+
private[streaming]
5455
class DeterministicKafkaInputDStream[
5556
K: ClassTag,
5657
V: ClassTag,
@@ -61,7 +62,7 @@ class DeterministicKafkaInputDStream[
6162
val kafkaParams: Map[String, String],
6263
val fromOffsets: Map[TopicAndPartition, Long],
6364
messageHandler: MessageAndMetadata[K, V] => R,
64-
maxRetries: Int = 1
65+
maxRetries: Int
6566
) extends InputDStream[R](ssc_) with Logging {
6667

6768
protected[streaming] override val checkpointData =

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@ import java.util.{Map => JMap}
2323
import scala.reflect.ClassTag
2424
import scala.collection.JavaConversions._
2525

26+
import kafka.common.TopicAndPartition
27+
import kafka.message.MessageAndMetadata
2628
import kafka.serializer.{Decoder, StringDecoder}
2729

30+
31+
import org.apache.spark.SparkContext
32+
import org.apache.spark.rdd.RDD
2833
import org.apache.spark.storage.StorageLevel
2934
import org.apache.spark.streaming.StreamingContext
3035
import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
31-
import org.apache.spark.streaming.dstream.ReceiverInputDStream
36+
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
37+
import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD, KafkaRDDPartition, OffsetRange}
3238

3339
object KafkaUtils {
3440
/**
@@ -144,4 +150,116 @@ object KafkaUtils {
144150
createStream[K, V, U, T](
145151
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
146152
}
153+
154+
/** A batch-oriented interface for consuming from Kafka.
155+
* Starting and ending offsets are specified in advance,
156+
* so that you can control exactly-once semantics.
157+
* @param sc SparkContext object
158+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
159+
* configuration parameters</a>.
160+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
161+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
162+
* @param batch Each OffsetRange in the batch corresponds to a
163+
* range of offsets for a given Kafka topic/partition
164+
* @param messageHandler function for translating each message into the desired type
165+
*/
166+
def createRDD[
167+
K: ClassTag,
168+
V: ClassTag,
169+
U <: Decoder[_]: ClassTag,
170+
T <: Decoder[_]: ClassTag,
171+
R: ClassTag] (
172+
sc: SparkContext,
173+
kafkaParams: Map[String, String],
174+
batch: Array[OffsetRange],
175+
messageHandler: MessageAndMetadata[K, V] => R
176+
): RDD[R] = {
177+
val parts = batch.zipWithIndex.map { case (o, i) =>
178+
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, o.host, o.port)
179+
}.toArray
180+
new KafkaRDD[K, V, U, T, R](sc, kafkaParams, parts, messageHandler)
181+
}
182+
183+
/**
184+
* This DOES NOT guarantee that side-effects of an action will see each message exactly once.
185+
* If you need that guarantee, get the offsets from this stream and store them with your output.
186+
* Nor does this store offsets in Kafka / Zookeeper.
187+
* If checkpointed, it will store offset ranges in the checkpoint, such that each message
188+
* will be transformed effectively exactly once even after failure,
189+
* provided you have sufficient Kafka log retention.
190+
*
191+
* @param ssc StreamingContext object
192+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
193+
* configuration parameters</a>.
194+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
195+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
196+
* @param messageHandler function for translating each message into the desired type
197+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
198+
* starting point of the stream
199+
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
200+
*/
201+
def createExactlyOnceStream[
202+
K: ClassTag,
203+
V: ClassTag,
204+
U <: Decoder[_]: ClassTag,
205+
T <: Decoder[_]: ClassTag,
206+
R: ClassTag] (
207+
ssc: StreamingContext,
208+
kafkaParams: Map[String, String],
209+
fromOffsets: Map[TopicAndPartition, Long],
210+
messageHandler: MessageAndMetadata[K, V] => R,
211+
maxRetries: Int
212+
): InputDStream[R] = {
213+
new DeterministicKafkaInputDStream[K, V, U, T, R](
214+
ssc, kafkaParams, fromOffsets, messageHandler, maxRetries)
215+
}
216+
217+
/**
218+
* This DOES NOT guarantee that side-effects of an action will see each message exactly once.
219+
* If you need that guarantee, get the offsets from this stream and store them with your output.
220+
* Nor does this store offsets in Kafka / Zookeeper.
221+
* If checkpointed, it will store offset ranges in the checkpoint, such that each message
222+
* will be transformed effectively exactly once even after failure,
223+
* provided you have sufficient Kafka log retention.
224+
*
225+
* @param ssc StreamingContext object
226+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
227+
* configuration parameters</a>.
228+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
229+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
230+
* If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
231+
* to determine where the stream starts (defaults to "largest")
232+
* @param topics names of the topics to consume
233+
*/
234+
def createExactlyOnceStream[
235+
K: ClassTag,
236+
V: ClassTag,
237+
U <: Decoder[_]: ClassTag,
238+
T <: Decoder[_]: ClassTag] (
239+
ssc: StreamingContext,
240+
kafkaParams: Map[String, String],
241+
topics: Set[String]
242+
): InputDStream[(K, V)] = {
243+
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
244+
val kc = new KafkaCluster(kafkaParams)
245+
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
246+
247+
(for {
248+
topicPartitions <- kc.getPartitions(topics).right
249+
leaderOffsets <- (if (reset == Some("smallest")) {
250+
kc.getEarliestLeaderOffsets(topicPartitions)
251+
} else {
252+
kc.getLatestLeaderOffsets(topicPartitions)
253+
}).right
254+
} yield {
255+
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
256+
(tp, lo.offset)
257+
}
258+
new DeterministicKafkaInputDStream[K, V, U, T, (K, V)](
259+
ssc, kafkaParams, fromOffsets, messageHandler, 1)
260+
}).fold(
261+
errs => throw new Exception(errs.mkString("\n")),
262+
ok => ok
263+
)
264+
}
147265
}

0 commit comments

Comments
 (0)