Skip to content

Commit 9a838c2

Browse files
committed
[SPARK-4964] code cleanup, add more tests
1 parent 2b340d8 commit 9a838c2

File tree

5 files changed

+283
-102
lines changed

5 files changed

+283
-102
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
2626
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
2727

2828
/**
29-
* Convenience methods for interacting with a Kafka cluster.
30-
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
31-
* configuration parameters</a>.
32-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
33-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
34-
*/
29+
* Convenience methods for interacting with a Kafka cluster.
30+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
31+
* configuration parameters</a>.
32+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
33+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
34+
*/
3535
private[spark]
3636
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
3737
import KafkaCluster.{Err, LeaderOffset}
@@ -59,22 +59,24 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
5959
new SimpleConsumer(host, port, config.socketTimeoutMs,
6060
config.socketReceiveBufferBytes, config.clientId)
6161

62-
def connect(hostAndPort: (String, Int)): SimpleConsumer =
63-
connect(hostAndPort._1, hostAndPort._2)
64-
6562
def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
66-
findLeader(topic, partition).right.map(connect)
63+
findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
64+
65+
// Metadata api
66+
// scalastyle:off
67+
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
68+
// scalastyle:on
6769

6870
def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
6971
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
7072
0, config.clientId, Seq(topic))
7173
val errs = new Err
7274
withBrokers(Random.shuffle(seedBrokers), errs) { consumer =>
7375
val resp: TopicMetadataResponse = consumer.send(req)
74-
resp.topicsMetadata.find(_.topic == topic).flatMap { t =>
75-
t.partitionsMetadata.find(_.partitionId == partition)
76-
}.foreach { partitionMeta =>
77-
partitionMeta.leader.foreach { leader =>
76+
resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
77+
tm.partitionsMetadata.find(_.partitionId == partition)
78+
}.foreach { pm: PartitionMetadata =>
79+
pm.leader.foreach { leader =>
7880
return Right((leader.host, leader.port))
7981
}
8082
}
@@ -85,9 +87,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
8587
def findLeaders(
8688
topicAndPartitions: Set[TopicAndPartition]
8789
): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
88-
getPartitionMetadata(topicAndPartitions.map(_.topic)).right.flatMap { tms =>
90+
val topics = topicAndPartitions.map(_.topic)
91+
getPartitionMetadata(topics).right.flatMap { tms: Set[TopicMetadata] =>
8992
val result = tms.flatMap { tm: TopicMetadata =>
90-
tm.partitionsMetadata.flatMap { pm =>
93+
tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
9194
val tp = TopicAndPartition(tm.topic, pm.partitionId)
9295
if (topicAndPartitions(tp)) {
9396
pm.leader.map { l =>
@@ -112,15 +115,15 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
112115
def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] =
113116
getPartitionMetadata(topics).right.map { r =>
114117
r.flatMap { tm: TopicMetadata =>
115-
tm.partitionsMetadata.map { pm =>
118+
tm.partitionsMetadata.map { pm: PartitionMetadata =>
116119
TopicAndPartition(tm.topic, pm.partitionId)
117120
}
118121
}
119122
}
120123

121124
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
122-
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
123-
0, config.clientId, topics.toSeq)
125+
val req = TopicMetadataRequest(
126+
TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
124127
val errs = new Err
125128
withBrokers(Random.shuffle(seedBrokers), errs) { consumer =>
126129
val resp: TopicMetadataResponse = consumer.send(req)
@@ -131,6 +134,11 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
131134
Left(errs)
132135
}
133136

137+
// Leader offset api
138+
// scalastyle:off
139+
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
140+
// scalastyle:on
141+
134142
def getLatestLeaderOffsets(
135143
topicAndPartitions: Set[TopicAndPartition]
136144
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
@@ -170,25 +178,25 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
170178
withBrokers(leaders, errs) { consumer =>
171179
val needed: Seq[TopicAndPartition] = leaderToTp((consumer.host, consumer.port))
172180
val req = OffsetRequest(
173-
needed.map { tp =>
181+
needed.map { tp: TopicAndPartition =>
174182
tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
175183
}.toMap
176184
)
177185
val resp = consumer.getOffsetsBefore(req)
178186
val respMap = resp.partitionErrorAndOffsets
179-
needed.foreach { tp =>
180-
respMap.get(tp).foreach { errAndOffsets =>
181-
if (errAndOffsets.error == ErrorMapping.NoError) {
182-
if (errAndOffsets.offsets.nonEmpty) {
183-
result += tp -> errAndOffsets.offsets.map { off =>
187+
needed.foreach { tp: TopicAndPartition =>
188+
respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
189+
if (por.error == ErrorMapping.NoError) {
190+
if (por.offsets.nonEmpty) {
191+
result += tp -> por.offsets.map { off =>
184192
LeaderOffset(consumer.host, consumer.port, off)
185193
}
186194
} else {
187195
errs.append(new Exception(
188196
s"Empty offsets for ${tp}, is ${before} before log beginning?"))
189197
}
190198
} else {
191-
errs.append(ErrorMapping.exceptionFor(errAndOffsets.error))
199+
errs.append(ErrorMapping.exceptionFor(por.error))
192200
}
193201
}
194202
}
@@ -202,9 +210,14 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
202210
}
203211
}
204212

213+
// Consumer offset api
214+
// scalastyle:off
215+
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
216+
// scalastyle:on
217+
205218
def getConsumerOffsets(
206-
groupId: String,
207-
topicAndPartitions: Set[TopicAndPartition]
219+
groupId: String,
220+
topicAndPartitions: Set[TopicAndPartition]
208221
): Either[Err, Map[TopicAndPartition, Long]] = {
209222
getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
210223
r.map { kv =>
@@ -214,8 +227,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
214227
}
215228

216229
def getConsumerOffsetMetadata(
217-
groupId: String,
218-
topicAndPartitions: Set[TopicAndPartition]
230+
groupId: String,
231+
topicAndPartitions: Set[TopicAndPartition]
219232
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
220233
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
221234
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
@@ -224,12 +237,12 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
224237
val resp = consumer.fetchOffsets(req)
225238
val respMap = resp.requestInfo
226239
val needed = topicAndPartitions.diff(result.keySet)
227-
needed.foreach { tp =>
228-
respMap.get(tp).foreach { offsetMeta =>
229-
if (offsetMeta.error == ErrorMapping.NoError) {
230-
result += tp -> offsetMeta
240+
needed.foreach { tp: TopicAndPartition =>
241+
respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
242+
if (ome.error == ErrorMapping.NoError) {
243+
result += tp -> ome
231244
} else {
232-
errs.append(ErrorMapping.exceptionFor(offsetMeta.error))
245+
errs.append(ErrorMapping.exceptionFor(ome.error))
233246
}
234247
}
235248
}
@@ -243,17 +256,17 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
243256
}
244257

245258
def setConsumerOffsets(
246-
groupId: String,
247-
offsets: Map[TopicAndPartition, Long]
259+
groupId: String,
260+
offsets: Map[TopicAndPartition, Long]
248261
): Either[Err, Map[TopicAndPartition, Short]] = {
249262
setConsumerOffsetMetadata(groupId, offsets.map { kv =>
250263
kv._1 -> OffsetMetadataAndError(kv._2)
251264
})
252265
}
253266

254267
def setConsumerOffsetMetadata(
255-
groupId: String,
256-
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
268+
groupId: String,
269+
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
257270
): Either[Err, Map[TopicAndPartition, Short]] = {
258271
var result = Map[TopicAndPartition, Short]()
259272
val req = OffsetCommitRequest(groupId, metadata)
@@ -263,8 +276,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
263276
val resp = consumer.commitOffsets(req)
264277
val respMap = resp.requestInfo
265278
val needed = topicAndPartitions.diff(result.keySet)
266-
needed.foreach { tp =>
267-
respMap.get(tp).foreach { err =>
279+
needed.foreach { tp: TopicAndPartition =>
280+
respMap.get(tp).foreach { err: Short =>
268281
if (err == ErrorMapping.NoError) {
269282
result += tp -> err
270283
} else {
@@ -281,18 +294,21 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
281294
Left(errs)
282295
}
283296

297+
// Try a call against potentially multiple brokers, accumulating errors
284298
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
285299
(fn: SimpleConsumer => Any): Unit = {
286300
brokers.foreach { hp =>
287301
var consumer: SimpleConsumer = null
288302
try {
289-
consumer = connect(hp)
303+
consumer = connect(hp._1, hp._2)
290304
fn(consumer)
291305
} catch {
292306
case NonFatal(e) =>
293307
errs.append(e)
294308
} finally {
295-
if (consumer != null) consumer.close()
309+
if (consumer != null) {
310+
consumer.close()
311+
}
296312
}
297313
}
298314
}
@@ -305,9 +321,10 @@ object KafkaCluster {
305321
private[spark]
306322
case class LeaderOffset(host: String, port: Int, offset: Long)
307323

308-
/** Make a consumer config without requiring group.id or zookeeper.connect,
309-
* since communicating with brokers also needs common settings such as timeout
310-
*/
324+
/**
325+
* Make a consumer config without requiring group.id or zookeeper.connect,
326+
* since communicating with brokers also needs common settings such as timeout
327+
*/
311328
def consumerConfig(kafkaParams: Map[String, String]): ConsumerConfig = {
312329
val props = new Properties()
313330
kafkaParams.foreach(param => props.put(param._1, param._2))

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,18 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset}
3131
import kafka.serializer.Decoder
3232
import kafka.utils.VerifiableProperties
3333

34-
/** A batch-oriented interface for consuming from Kafka.
35-
* Starting and ending offsets are specified in advance,
36-
* so that you can control exactly-once semantics.
37-
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
38-
* configuration parameters</a>.
39-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
40-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
41-
* @param batch Each KafkaRDDPartition in the batch corresponds to a
42-
* range of offsets for a given Kafka topic/partition
43-
* @param messageHandler function for translating each message into the desired type
44-
*/
34+
/**
35+
* A batch-oriented interface for consuming from Kafka.
36+
* Starting and ending offsets are specified in advance,
37+
* so that you can control exactly-once semantics.
38+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
39+
* configuration parameters</a>.
40+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
41+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
42+
* @param batch Each KafkaRDDPartition in the batch corresponds to a
43+
* range of offsets for a given Kafka topic/partition
44+
* @param messageHandler function for translating each message into the desired type
45+
*/
4546
private[spark]
4647
class KafkaRDD[
4748
K: ClassTag,
@@ -65,16 +66,24 @@ class KafkaRDD[
6566
Seq(part.host)
6667
}
6768

68-
private def assertStartedCleanly(part: KafkaRDDPartition) {
69-
assert(part.fromOffset <= part.untilOffset,
70-
s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
71-
s"for topic ${part.topic} partition ${part.partition}. " +
72-
"You either provided an invalid fromOffset, or the Kafka topic has been damaged")
73-
}
69+
private def errBeginAfterEnd(part: KafkaRDDPartition): String =
70+
s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
71+
s"for topic ${part.topic} partition ${part.partition}. " +
72+
"You either provided an invalid fromOffset, or the Kafka topic has been damaged"
73+
74+
private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
75+
s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
76+
s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
77+
" This should not happen, and indicates that messages may have been lost"
78+
79+
private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
80+
s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
81+
s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
82+
" This should not happen, and indicates a message may have been skipped"
7483

7584
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
7685
val part = thePart.asInstanceOf[KafkaRDDPartition]
77-
assertStartedCleanly(part)
86+
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
7887
if (part.fromOffset == part.untilOffset) {
7988
log.warn("Beginning offset ${part.fromOffset} is the same as ending offset " +
8089
s"skipping ${part.topic} ${part.partition}")
@@ -85,7 +94,8 @@ class KafkaRDD[
8594
}
8695

8796
private class KafkaRDDIterator(
88-
part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] {
97+
part: KafkaRDDPartition,
98+
context: TaskContext) extends NextIterator[R] {
8999

90100
context.addTaskCompletionListener{ context => closeIfNeeded() }
91101

@@ -132,24 +142,10 @@ class KafkaRDD[
132142
}
133143
}
134144

135-
private def assertFinishedEmpty(requestOffset: Long) {
136-
assert(requestOffset == part.untilOffset,
137-
s"ran out of messages before reaching ending offset ${part.untilOffset} " +
138-
s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
139-
" This should not happen, and indicates that messages may have been lost")
140-
}
141-
142-
private def assertFinishedWithoutOvershoot(itemOffset: Long) {
143-
assert(itemOffset == part.untilOffset,
144-
s"got ${itemOffset} > ending offset ${part.untilOffset} " +
145-
s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
146-
" This should not happen, and indicates a message may have been skipped")
147-
}
148-
149145
private def fetchBatch: Iterator[MessageAndOffset] = {
150-
val req = new FetchRequestBuilder().
151-
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
152-
build()
146+
val req = new FetchRequestBuilder()
147+
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
148+
.build()
153149
val resp = consumer.fetch(req)
154150
handleFetchErr(resp)
155151
// kafka may return a batch that starts before the requested offset
@@ -160,18 +156,18 @@ class KafkaRDD[
160156

161157
override def close() = consumer.close()
162158

163-
override def getNext: R = {
159+
override def getNext(): R = {
164160
if (iter == null || !iter.hasNext) {
165161
iter = fetchBatch
166162
}
167163
if (!iter.hasNext) {
168-
assertFinishedEmpty(requestOffset)
164+
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
169165
finished = true
170166
null.asInstanceOf[R]
171167
} else {
172-
val item = iter.next
168+
val item = iter.next()
173169
if (item.offset >= part.untilOffset) {
174-
assertFinishedWithoutOvershoot(item.offset)
170+
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
175171
finished = true
176172
null.asInstanceOf[R]
177173
} else {
@@ -189,16 +185,16 @@ object KafkaRDD {
189185
import KafkaCluster.LeaderOffset
190186

191187
/**
192-
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
193-
* configuration parameters</a>.
194-
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
195-
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
196-
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
197-
* starting point of the batch
198-
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
199-
* ending point of the batch
200-
* @param messageHandler function for translating each message into the desired type
201-
*/
188+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
189+
* configuration parameters</a>.
190+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
191+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
192+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
193+
* starting point of the batch
194+
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
195+
* ending point of the batch
196+
* @param messageHandler function for translating each message into the desired type
197+
*/
202198
def apply[
203199
K: ClassTag,
204200
V: ClassTag,

0 commit comments

Comments
 (0)