Skip to content

Commit ce91c59

Browse files
committed
method to get consumer offsets, explicit error handling
1 parent 4dafd1b commit ce91c59

File tree

2 files changed

+87
-48
lines changed

2 files changed

+87
-48
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 83 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.spark.rdd.kafka
1919

2020
import scala.util.control.NonFatal
21+
import scala.collection.mutable.ArrayBuffer
2122
import java.util.Properties
22-
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse}
23+
import kafka.api.{OffsetRequest, OffsetResponse, OffsetFetchRequest, OffsetFetchResponse, PartitionOffsetRequestInfo, TopicMetadataRequest, TopicMetadataResponse}
2324
import kafka.common.{ErrorMapping, TopicAndPartition}
2425
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
2526

@@ -30,6 +31,8 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
3031
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
3132
*/
3233
class KafkaCluster(val kafkaParams: Map[String, String]) {
34+
type Err = ArrayBuffer[Throwable]
35+
3336
val brokers: Array[(String, Int)] =
3437
kafkaParams.get("metadata.broker.list")
3538
.orElse(kafkaParams.get("bootstrap.servers"))
@@ -47,79 +50,113 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
4750
def connect(hostAndPort: (String, Int)): SimpleConsumer =
4851
connect(hostAndPort._1, hostAndPort._2)
4952

50-
def connectLeader(topic: String, partition: Int): Option[SimpleConsumer] =
51-
findLeader(topic, partition).map(connect)
53+
def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
54+
findLeader(topic, partition).right.map(connect)
5255

53-
def findLeader(topic: String, partition: Int): Option[(String, Int)] = {
56+
def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
5457
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, Seq(topic))
55-
brokers.foreach { hp =>
56-
var consumer: SimpleConsumer = null
57-
try {
58-
consumer = connect(hp)
59-
val resp: TopicMetadataResponse = consumer.send(req)
60-
resp.topicsMetadata.find(_.topic == topic).flatMap { t =>
61-
t.partitionsMetadata.find(_.partitionId == partition)
62-
}.foreach { partitionMeta =>
63-
partitionMeta.leader.foreach { leader =>
64-
return Some((leader.host, leader.port))
65-
}
58+
val errs = new Err
59+
withBrokers(errs) { consumer =>
60+
val resp: TopicMetadataResponse = consumer.send(req)
61+
resp.topicsMetadata.find(_.topic == topic).flatMap { t =>
62+
t.partitionsMetadata.find(_.partitionId == partition)
63+
}.foreach { partitionMeta =>
64+
partitionMeta.leader.foreach { leader =>
65+
return Right((leader.host, leader.port))
6666
}
67-
} catch {
68-
case NonFatal(e) =>
69-
} finally {
70-
if (consumer != null) consumer.close()
7167
}
7268
}
73-
None
69+
Left(errs)
7470
}
7571

76-
def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] =
72+
def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] =
7773
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
7874

79-
def getEarliestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] =
75+
def getEarliestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] =
8076
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
8177

82-
def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long): Map[TopicAndPartition, Long] =
83-
getLeaderOffsets(topicAndPartitions, before, 1).map { kv =>
84-
// mapValues isnt serializable, see SI-7005
85-
kv._1 -> kv._2.head
78+
def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long): Either[Err, Map[TopicAndPartition, Long]] =
79+
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
80+
r.map { kv =>
81+
// mapValues isnt serializable, see SI-7005
82+
kv._1 -> kv._2.head
83+
}
8684
}
8785

88-
def getLeaderOffsets(topicAndPartitions: Set[TopicAndPartition], before: Long, maxNumOffsets: Int): Map[TopicAndPartition, Seq[Long]] = {
86+
def getLeaderOffsets(
87+
topicAndPartitions: Set[TopicAndPartition],
88+
before: Long,
89+
maxNumOffsets: Int
90+
): Either[Err, Map[TopicAndPartition, Seq[Long]]] = {
8991
var result = Map[TopicAndPartition, Seq[Long]]()
9092
val req = OffsetRequest(
9193
topicAndPartitions.map(tp => tp -> PartitionOffsetRequestInfo(before, 1)).toMap
9294
)
95+
val errs = new Err
96+
withBrokers(errs) { consumer =>
97+
val resp: OffsetResponse = consumer.getOffsetsBefore(req)
98+
val respMap = resp.partitionErrorAndOffsets
99+
val needed = topicAndPartitions.diff(result.keys.toSet)
100+
needed.foreach { tp =>
101+
respMap.get(tp).foreach { errAndOffsets =>
102+
if (errAndOffsets.error == ErrorMapping.NoError) {
103+
result += tp -> errAndOffsets.offsets
104+
} else {
105+
errs.append(ErrorMapping.exceptionFor(errAndOffsets.error))
106+
}
107+
}
108+
}
109+
if (result.keys.size == topicAndPartitions.size) {
110+
return Right(result)
111+
}
112+
}
113+
val missing = topicAndPartitions.diff(result.keys.toSet)
114+
errs.append(new Exception(s"Couldn't find offsets for ${missing}"))
115+
Left(errs)
116+
}
117+
118+
def getConsumerOffsets(groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = {
119+
var result = Map[TopicAndPartition, Long]()
120+
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
121+
val errs = new Err
122+
withBrokers(errs) { consumer =>
123+
val resp: OffsetFetchResponse = consumer.fetchOffsets(req)
124+
val respMap = resp.requestInfo
125+
val needed = topicAndPartitions.diff(result.keys.toSet)
126+
needed.foreach { tp =>
127+
respMap.get(tp).foreach { offsetMeta =>
128+
if (offsetMeta.error == ErrorMapping.NoError) {
129+
result += tp -> offsetMeta.offset
130+
} else {
131+
errs.append(ErrorMapping.exceptionFor(offsetMeta.error))
132+
}
133+
}
134+
}
135+
if (result.keys.size == topicAndPartitions.size) {
136+
return Right(result)
137+
}
138+
}
139+
val missing = topicAndPartitions.diff(result.keys.toSet)
140+
errs.append(new Exception(s"Couldn't find offsets for ${missing}"))
141+
Left(errs)
142+
}
143+
144+
def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long]): Unit = ???
145+
146+
private def withBrokers(errs: Err)(fn: SimpleConsumer => Any): Unit = {
93147
brokers.foreach { hp =>
94148
var consumer: SimpleConsumer = null
95149
try {
96150
consumer = connect(hp)
97-
val resp: OffsetResponse = consumer.getOffsetsBefore(req)
98-
val respParts = resp.partitionErrorAndOffsets
99-
val needed = topicAndPartitions.diff(result.keys.toSet)
100-
needed.foreach { tp =>
101-
respParts.get(tp).foreach { errAndOffsets =>
102-
if (errAndOffsets.error == ErrorMapping.NoError) {
103-
result += tp -> errAndOffsets.offsets
104-
}
105-
}
106-
}
107-
if (result.keys.size == topicAndPartitions.size) {
108-
return result
109-
}
151+
fn(consumer)
110152
} catch {
111153
case NonFatal(e) =>
154+
errs.append(e)
112155
} finally {
113156
if (consumer != null) consumer.close()
114157
}
115158
}
116-
val missing = topicAndPartitions.diff(result.keys.toSet)
117-
throw new Exception(s"Couldn't find offsets for ${missing}")
118159
}
119-
120-
def getConsumerOffsets(topicAndPartitions: Set[TopicAndPartition]): Map[TopicAndPartition, Long] = ???
121-
122-
def setConsumerOffsets(offsets: Map[TopicAndPartition, Long]): Unit = ???
123160
}
124161

125162
object KafkaCluster {

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@ class KafkaRDD[
8282
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
8383
.newInstance(kc.config.props)
8484
.asInstanceOf[Decoder[V]]
85-
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition)
86-
.getOrElse(throw new Exception(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}"))
85+
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold(
86+
errs => throw new Exception(s"""Couldn't connect to leader for topic ${part.topic} ${part.partition}: ${errs.mkString("\n")}"""),
87+
consumer => consumer
88+
)
8789
var requestOffset = part.afterOffset + 1
8890
var iter: Iterator[MessageAndOffset] = null
8991

0 commit comments

Comments
 (0)