Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
*
* @param kafkaParams Map of kafka configuration paramaters.
* @param kafkaParams Map of kafka configuration parameters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
Expand Down Expand Up @@ -76,29 +76,31 @@ class KafkaReceiver[
// Connection to Kafka
var consumerConnector : ConsumerConnector = null

def onStop() { }
def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
}
}

def onStart() {

// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))

logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))

// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))

val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + kafkaParams("zookeeper.connect"))
logInfo("Connected to " + zkConnect)

// When autooffset.reset is defined, it is our responsibility to try and whack the
// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}

val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
Expand All @@ -112,10 +114,14 @@ class KafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)


// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - but to avoid a name collision with Spark's own Executor we usually try call variables like this threadPool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that actually you didn't add this name, so nevermind!

try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
}

Expand All @@ -124,30 +130,35 @@ class KafkaReceiver[
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
store((msgAndMetadata.key, msgAndMetadata.message))
try {
for (msgAndMetadata <- stream) {
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
case e: Throwable => logError("Error handling message; exiting", e)
}
}
}

// It is our responsibility to delete the consumer group when specifying autooffset.reset. This
// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
case _ : Throwable => // swallow
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
}