@@ -20,13 +20,14 @@ package org.apache.spark.rdd.kafka
2020import scala .util .control .NonFatal
2121import scala .collection .mutable .ArrayBuffer
2222import java .util .Properties
23- import kafka .api .{ OffsetCommitRequest , OffsetRequest , OffsetFetchRequest , PartitionOffsetRequestInfo , TopicMetadata , TopicMetadataRequest , TopicMetadataResponse }
23+ import kafka .api ._
2424import kafka .common .{ErrorMapping , OffsetMetadataAndError , TopicAndPartition }
2525import kafka .consumer .{ConsumerConfig , SimpleConsumer }
2626
2727/**
2828 * Convenience methods for interacting with a Kafka cluster.
29- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">configuration parameters</a>.
29+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
30+ * configuration parameters</a>.
3031 * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
3132 * NOT zookeeper servers, specified in host1:port1,host2:port2 form
3233 */
@@ -45,7 +46,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
4546 val config : ConsumerConfig = KafkaCluster .consumerConfig(kafkaParams)
4647
4748 def connect (host : String , port : Int ): SimpleConsumer =
48- new SimpleConsumer (host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)
49+ new SimpleConsumer (host, port, config.socketTimeoutMs,
50+ config.socketReceiveBufferBytes, config.clientId)
4951
5052 def connect (hostAndPort : (String , Int )): SimpleConsumer =
5153 connect(hostAndPort._1, hostAndPort._2)
@@ -54,7 +56,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
5456 findLeader(topic, partition).right.map(connect)
5557
5658 def findLeader (topic : String , partition : Int ): Either [Err , (String , Int )] = {
57- val req = TopicMetadataRequest (TopicMetadataRequest .CurrentVersion , 0 , config.clientId, Seq (topic))
59+ val req = TopicMetadataRequest (TopicMetadataRequest .CurrentVersion ,
60+ 0 , config.clientId, Seq (topic))
5861 val errs = new Err
5962 withBrokers(errs) { consumer =>
6063 val resp : TopicMetadataResponse = consumer.send(req)
@@ -79,7 +82,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
7982 }
8083
8184 def getPartitionMetadata (topics : Set [String ]): Either [Err , Set [TopicMetadata ]] = {
82- val req = TopicMetadataRequest (TopicMetadataRequest .CurrentVersion , 0 , config.clientId, topics.toSeq)
85+ val req = TopicMetadataRequest (TopicMetadataRequest .CurrentVersion ,
86+ 0 , config.clientId, topics.toSeq)
8387 val errs = new Err
8488 withBrokers(errs) { consumer =>
8589 val resp : TopicMetadataResponse = consumer.send(req)
@@ -90,13 +94,20 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
9094 Left (errs)
9195 }
9296
93- def getLatestLeaderOffsets (topicAndPartitions : Set [TopicAndPartition ]): Either [Err , Map [TopicAndPartition , Long ]] =
97+ def getLatestLeaderOffsets (
98+ topicAndPartitions : Set [TopicAndPartition ]
99+ ): Either [Err , Map [TopicAndPartition , Long ]] =
94100 getLeaderOffsets(topicAndPartitions, OffsetRequest .LatestTime )
95101
96- def getEarliestLeaderOffsets (topicAndPartitions : Set [TopicAndPartition ]): Either [Err , Map [TopicAndPartition , Long ]] =
102+ def getEarliestLeaderOffsets (
103+ topicAndPartitions : Set [TopicAndPartition ]
104+ ): Either [Err , Map [TopicAndPartition , Long ]] =
97105 getLeaderOffsets(topicAndPartitions, OffsetRequest .EarliestTime )
98106
99- def getLeaderOffsets (topicAndPartitions : Set [TopicAndPartition ], before : Long ): Either [Err , Map [TopicAndPartition , Long ]] =
107+ def getLeaderOffsets (
108+ topicAndPartitions : Set [TopicAndPartition ],
109+ before : Long
110+ ): Either [Err , Map [TopicAndPartition , Long ]] =
100111 getLeaderOffsets(topicAndPartitions, before, 1 ).right.map { r =>
101112 r.map { kv =>
102113 // mapValues isnt serializable, see SI-7005
@@ -136,7 +147,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
136147 Left (errs)
137148 }
138149
139- def getConsumerOffsets (groupId : String , topicAndPartitions : Set [TopicAndPartition ]): Either [Err , Map [TopicAndPartition , Long ]] = {
150+ def getConsumerOffsets (
151+ groupId : String ,
152+ topicAndPartitions : Set [TopicAndPartition ]
153+ ): Either [Err , Map [TopicAndPartition , Long ]] = {
140154 getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
141155 r.map { kv =>
142156 kv._1 -> kv._2.offset
@@ -173,7 +187,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
173187 Left (errs)
174188 }
175189
176- def setConsumerOffsets (groupId : String , offsets : Map [TopicAndPartition , Long ]): Unit = {
190+ def setConsumerOffsets (
191+ groupId : String ,
192+ offsets : Map [TopicAndPartition , Long ]
193+ ): Either [Err , Map [TopicAndPartition , Short ]] = {
177194 setConsumerOffsetMetadata(groupId, offsets.map { kv =>
178195 kv._1 -> OffsetMetadataAndError (kv._2)
179196 })
@@ -233,8 +250,9 @@ object KafkaCluster {
233250 val props = new Properties ()
234251 kafkaParams.foreach(param => props.put(param._1, param._2))
235252 Seq (" zookeeper.connect" , " group.id" ).foreach { s =>
236- if (! props.contains(s))
237- props.setProperty(s, " " )
253+ if (! props.contains(s)) {
254+ props.setProperty(s, " " )
255+ }
238256 }
239257 new ConsumerConfig (props)
240258 }
0 commit comments