From 6a91cab3172701e1c286dbcdadee825a33230913 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 4 Feb 2015 19:56:46 -0800 Subject: [PATCH 01/10] Added example --- .../streaming/DirectKafkaWordCount.scala | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala new file mode 100644 index 0000000000000..b09664a82ddac --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import kafka.serializer.StringDecoder + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + * is a list of one or more zookeeper servers that make quorum + * is a list of one or more kafka topics to consume from + * + * Example: + * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + */ +object DirectKafkaWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: DirectKafkaWordCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(brokerList, topics) = args + val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList) + val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} From 49867846e72329c849002706441d8165825f4a6b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 4 Feb 2015 20:01:17 -0800 Subject: [PATCH 02/10] Added unit test to kafka offset recovery --- .../kafka/DirectKafkaInputDStream.scala | 1 - .../spark/streaming/kafka/OffsetRange.scala | 17 ++ .../kafka/DirectKafkaStreamSuite.scala | 201 ++++++++++++++++++ .../kafka/KafkaDirectStreamSuite.scala | 92 -------- 4 files changed, 218 insertions(+), 93 deletions(-) create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala delete mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index c7bca43eb889d..801f0a89c9c39 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -50,7 +50,6 @@ import org.apache.spark.streaming.dstream._ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler function for translating each message into the desired type - * @param maxRetries maximum number of times in a row to retry getting leaders' offsets */ private[streaming] class DirectKafkaInputDStream[ diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 334c12e4627b4..76b95507dbf49 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -36,6 +36,23 @@ final class OffsetRange private( val untilOffset: Long) extends Serializable { import OffsetRange.OffsetRangeTuple + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.partition == that.partition && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic='$topic', partition=$partition, range: [$fromOffset -> $untilOffset]" + } + /** this is to avoid ClassNotFoundException during checkpoint restore */ private[streaming] def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala new file mode 100644 index 0000000000000..75e2acb5b82b5 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps + +import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, Timeouts} + +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.util.Utils + +class DirectKafkaStreamSuite extends KafkaStreamSuiteBase +with BeforeAndAfter with BeforeAndAfterAll with Eventually { + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + + val brokerHost = "localhost" + + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerHost:$brokerPort", + "auto.offset.reset" -> "smallest" + ) + + var ssc: StreamingContext = _ + var testDir: File = _ + + override def beforeAll { + setupKafka() + } + + override def afterAll { + tearDownKafka() + } + + after { + if (ssc != null) { + ssc.stop() + } + if (testDir != null) { + Utils.deleteRecursively(testDir) + } + } + + test("basic receiving with multiple topics") { + val topics = Set("newA", "newB") + val data = Map("a" -> 7, "b" -> 9) + topics.foreach { t => + createTopic(t) + produceAndSendMessage(t, data) + } + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics) + } + var total = 0L + + stream.foreachRDD { rdd => + // Get the offset ranges in the RDD + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + // For each partition, get size of the range in the partition, + // and the number of items in the partition + val off = offsets(i) + val all = iter.toSeq + val partSize = all.size + val rangeSize = off.untilOffset - off.fromOffset + Iterator((partSize, rangeSize)) + }.collect + + // Verify whether number of elements in each partition + // matches with the corresponding offset range + collected.foreach { case (partSize, rangeSize) => + assert(partSize === rangeSize, "offset ranges are wrong") + } + total += collected.size // Add up all the collected items + } + ssc.start() + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + assert(total === data.values.sum * topics.size, "didn't get all messages") + } + ssc.stop() + } + + // Test to verify the offset ranges can be recovered from the checkpoints + test("offset recovery") { + val topic = "recovery" + createTopic(topic) + testDir = Utils.createTempDir() + + // Send data to Kafka and wait for it to be received + def sendDataAndWaitForReceive(data: Seq[Int]) { + val strings = data.map { _.toString} + produceAndSendMessage(topic, strings.map { _ -> 1}.toMap) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) + } + } + + // Setup the streaming context + ssc = new StreamingContext(sparkConf, Milliseconds(100)) + val kafkaStream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Set(topic)) + } + val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt } + val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => + Some(values.sum + state.getOrElse(0)) + } + ssc.checkpoint(testDir.getAbsolutePath) + + // This is to collect the raw data received from Kafka + kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => + val data = rdd.map { _._2 }.collect() + DirectKafkaStreamSuite.collectedData.appendAll(data) + } + + // This is ensure all the data is eventually receiving only once + stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => + rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } + } + ssc.start() + + // Send some data and wait for them to be received + for (i <- (1 to 10).grouped(4)) { + sendDataAndWaitForReceive(i) + } + + // Verify that offset ranges were generated + val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) + assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") + assert( + offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, + "starting offset not zero" + ) + ssc.stop() + logInfo("====== RESTARTING ========") + + // Recover context from checkpoints + ssc = new StreamingContext(testDir.getAbsolutePath) + val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] + + // Verify offset ranges have been recovered + val recoveredOffsetRanges = getOffsetRanges(recoveredStream) + assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") + val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } + assert( + recoveredOffsetRanges.forall { or => + earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) + }, + "Recovered ranges are not the same as the ones generated" + ) + + // Restart context, give more data and verify the total at the end + // If the total is write that means each records has been received only once + ssc.start() + sendDataAndWaitForReceive(11 to 20) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(DirectKafkaStreamSuite.total === (1 to 20).sum) + } + ssc.stop() + } + + /** Get the generated offset ranges from the DirectKafkaStream */ + private def getOffsetRanges[K, V]( + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream.generatedRDDs.mapValues { rdd => + rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges + }.toSeq.sortBy { _._1 } + } +} + +object DirectKafkaStreamSuite { + val collectedData = new mutable.ArrayBuffer[String]() + var total = -1L +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala deleted file mode 100644 index 0891ce344f16a..0000000000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.util.Random -import scala.concurrent.duration._ - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually - -import kafka.serializer.StringDecoder - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} - -class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { - val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - - val brokerHost = "localhost" - - val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerHost:$brokerPort", - "auto.offset.reset" -> "smallest" - ) - - var ssc: StreamingContext = _ - - before { - setupKafka() - - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - } - - after { - if (ssc != null) { - ssc.stop() - } - tearDownKafka() - } - - test("multi topic stream") { - val topics = Set("newA", "newB") - val data = Map("a" -> 7, "b" -> 9) - topics.foreach { t => - createTopic(t) - produceAndSendMessage(t, data) - } - val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics) - var total = 0L; - - stream.foreachRDD { rdd => - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - val collected = rdd.mapPartitionsWithIndex { (i, iter) => - val off = offsets(i) - val all = iter.toSeq - val partSize = all.size - val rangeSize = off.untilOffset - off.fromOffset - all.map { _ => - (partSize, rangeSize) - }.toIterator - }.collect - collected.foreach { case (partSize, rangeSize) => - assert(partSize === rangeSize, "offset ranges are wrong") - } - total += collected.size - } - ssc.start() - eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(total === data.values.sum * topics.size, "didn't get all messages") - } - ssc.stop() - } -} From e73589c733f9c03b5dad516b7d45a06d965dfd1d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 4 Feb 2015 20:30:43 -0800 Subject: [PATCH 03/10] Minor changes. --- .../scala/org/apache/spark/streaming/kafka/OffsetRange.scala | 2 +- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 76b95507dbf49..11bb8aa195eb1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -50,7 +50,7 @@ final class OffsetRange private( } override def toString(): String = { - s"OffsetRange(topic='$topic', partition=$partition, range: [$fromOffset -> $untilOffset]" + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]" } /** this is to avoid ClassNotFoundException during checkpoint restore */ diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 75e2acb5b82b5..eff468e3d0254 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.util.Utils class DirectKafkaStreamSuite extends KafkaStreamSuiteBase -with BeforeAndAfter with BeforeAndAfterAll with Eventually { + with BeforeAndAfter with BeforeAndAfterAll with Eventually { val sparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) From 50f2b56f57b00845d006361049dd2c4bac89957c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 Feb 2015 17:17:46 -0800 Subject: [PATCH 04/10] Added Java API and added more Scala and Java unit tests. Also updated docs. --- .../kafka/DirectKafkaInputDStream.scala | 4 +- .../spark/streaming/kafka/KafkaCluster.scala | 3 + .../spark/streaming/kafka/KafkaUtils.scala | 279 ++++++++++++++---- .../kafka/JavaDirectKafkaStreamSuite.java | 171 +++++++++++ .../kafka/DirectKafkaStreamSuite.scala | 111 ++++++- .../streaming/kafka/KafkaClusterSuite.scala | 6 +- .../streaming/kafka/KafkaStreamSuite.scala | 11 +- 7 files changed, 505 insertions(+), 80 deletions(-) create mode 100644 external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 801f0a89c9c39..04e65cb3d708c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -55,8 +55,8 @@ private[streaming] class DirectKafkaInputDStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + U <: Decoder[K]: ClassTag, + T <: Decoder[V]: ClassTag, R: ClassTag]( @transient ssc_ : StreamingContext, val kafkaParams: Map[String, String], diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index ccc62bfe8f057..2f7e0ab39fefd 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -332,6 +332,9 @@ object KafkaCluster { extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => val hpa = hp.split(":") + if (hpa.size == 1) { + throw new SparkException(s"Broker not the in correct format of : [$brokers]") + } (hpa(0), hpa(1).toInt) } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index f8aa6c5c6263c..14f9ec70bf378 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming.kafka import java.lang.{Integer => JInt} +import java.lang.{Long => JLong} import java.util.{Map => JMap} +import java.util.{Set => JSet} import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -27,14 +29,15 @@ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.{Decoder, StringDecoder} - +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} object KafkaUtils { /** @@ -121,8 +124,8 @@ object KafkaUtils { /** * Create an input stream that pulls messages from a Kafka Broker. * @param jssc JavaStreamingContext object - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD + * @param keyTypeClass Key type of DStream + * @param valueTypeClass value type of Dstream * @param keyDecoderClass Type of kafka key decoder * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration parameters, @@ -151,7 +154,8 @@ object KafkaUtils { jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } - /** A batch-oriented interface for consuming from Kafka. + /** + * Create a RDD from the * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param sc SparkContext object @@ -166,12 +170,12 @@ object KafkaUtils { def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag] ( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = { + ): RDD[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet @@ -179,7 +183,7 @@ object KafkaUtils { errs => throw new SparkException(errs.mkString("\n")), ok => ok ) - new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) + new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } /** A batch-oriented interface for consuming from Kafka. @@ -199,101 +203,141 @@ object KafkaUtils { def createRDD[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, R: ClassTag] ( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = { - + ): RDD[R] = { val leaderMap = leaders .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port)) .toMap - new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaRDD[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createRDD[K, V, KD, VD, R]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) + } + + @Experimental + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] + ): JavaPairRDD[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * :: Experimental :: + * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee + * that each message from Kafka is included in transformations exactly once (see points below). * * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide - * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. - * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to either ensure that the output operation is - * idempotent, or transactionally store offsets with the output. See the programming guide for - * more details. + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param messageHandler function for translating each message into the desired type - * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each raw message into the desired type */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { - new DirectKafkaInputDStream[K, V, U, T, R]( + new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, messageHandler) } /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * :: Experimental :: + * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee + * that each message from Kafka is included in transformations exactly once (see points below). * * Points to note: - * - * Failure Recovery - You must checkpoint this stream. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide - * -. * Zookeeper - This does not use Zookeeper to store offsets. For interop with Kafka monitors - * that depend on Zookeeper, you must store offsets in ZK yourself. - * - * End-to-end semantics - This does not guarantee that any output operation will push each record - * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and - * outputting exactly once), you have to ensure that the output operation is idempotent. + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * to determine where the stream starts (defaults to "largest") - * @param topics names of the topics to consume + * @param topics Names of the topics to consume */ @Experimental def createDirectStream[ K: ClassTag, V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag] ( + KD <: Decoder[K]: ClassTag, + VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] @@ -313,11 +357,122 @@ object KafkaUtils { val fromOffsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } - new DirectKafkaInputDStream[K, V, U, T, (K, V)]( + new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }).fold( errs => throw new SparkException(errs.mkString("\n")), ok => ok ) } + + /** + * :: Experimental :: + * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee + * that each message from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class of the value decoder + * @param recordClass Class of the records in DStream + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + * @param messageHandler Function for translating each raw message into the desired type + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + recordClass: Class[R], + kafkaParams: JMap[String, String], + fromOffsets: JMap[TopicAndPartition, JLong], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaInputDStream[R] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createDirectStream[K, V, KD, VD, R]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), + messageHandler.call _ + ) + } + + /** + * :: Experimental :: + * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee + * that each message from Kafka is included in transformations exactly once (see points below). + * + * Points to note: + * - No receivers: This stream does not use any receiver. It directly queries Kafka + * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked + * by the stream itself. For interoperability with Kafka monitoring tools that depend on + * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. +* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * in the [[StreamingContext]]. The information on consumed offset can be + * recovered from the checkpoint. See the programming guide for details (constraints, etc.). + * - End-to-end semantics: This stream ensures that every records is effectively received and + * transformed exactly once, but gives no guarantees on whether the transformed data are + * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure + * that the output operation is idempotent, or use transactions to output records atomically. + * See the programming guide for more details. + * + * @param jssc JavaStreamingContext object + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param keyDecoderClass Class of the key decoder + * @param valueDecoderClass Class type of the value decoder + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers), specified in + * host1:port1,host2:port2 form. + * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" + * to determine where the stream starts (defaults to "largest") + * @param topics Names of the topics to consume + */ + @Experimental + def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + jssc: JavaStreamingContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + topics: JSet[String] + ): JavaPairInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) + implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) + implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) + createDirectStream[K, V, KD, VD]( + jssc.ssc, + Map(kafkaParams.toSeq: _*), + Set(topics.toSeq: _*) + ) + } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java new file mode 100644 index 0000000000000..9060ec7d9104c --- /dev/null +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.List; +import java.util.ArrayList; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.Duration; +import scala.Predef; +import scala.Tuple2; +import scala.collection.JavaConverters; + +import junit.framework.Assert; + +import kafka.common.TopicAndPartition; +import kafka.message.MessageAndMetadata; +import kafka.serializer.StringDecoder; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaDirectKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient Random random = new Random(); + private transient KafkaStreamSuiteBase suiteBase = null; + + @Before + public void setUp() { + suiteBase = new KafkaStreamSuiteBase() { }; + suiteBase.setupKafka(); + System.clearProperty("spark.driver.port"); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + System.clearProperty("spark.driver.port"); + suiteBase.tearDownKafka(); + } + + @Test + public void testKafkaStream() throws InterruptedException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + List topic1data = createTopicAndSendData(topic1); + List topic2data = createTopicAndSendData(topic2); + + HashSet sent = new HashSet(); + sent.addAll(topic1data); + sent.addAll(topic2data); + + HashMap kafkaParams = new HashMap(); + kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + kafkaParams.put("auto.offset.reset", "smallest"); + + JavaDStream stream1 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicToSet(topic1) + ).map( + new Function, String>() { + @Override + public String call(scala.Tuple2 kv) throws Exception { + return kv._2(); + } + } + ); + + JavaDStream stream2 = KafkaUtils.createDirectStream( + ssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + String.class, + kafkaParams, + topicOffsetToMap(topic2, (long) 0), + new Function, String>() { + @Override + public String call(MessageAndMetadata msgAndMd) throws Exception { + return msgAndMd.message(); + } + } + ); + JavaDStream unifiedStream = stream1.union(stream2); + + final HashSet result = new HashSet(); + unifiedStream.foreachRDD( + new Function, Void> () { + @Override + public Void call(org.apache.spark.api.java.JavaRDD rdd) throws Exception { + result.addAll(rdd.collect()); + return null; + } + } + ); + ssc.start(); + long startTime = System.currentTimeMillis(); + boolean matches = false; + while (!matches && System.currentTimeMillis() - startTime < 20000) { + matches = sent.size() == result.size(); + Thread.sleep(50); + } + Assert.assertEquals(sent, result); + ssc.stop(); + } + + private HashSet topicToSet(String topic) { + HashSet topicSet = new HashSet(); + topicSet.add(topic); + return topicSet; + } + + private HashMap topicOffsetToMap(String topic, Long offsetToStart) { + HashMap topicMap = new HashMap(); + topicMap.put(new TopicAndPartition(topic, scala.Int.box(0)), offsetToStart); + return topicMap; + } + + private List createTopicAndSendData(String topic) { + List data = java.util.Arrays.asList(topic+"-1", topic+"-2", topic+"-3"); + HashMap sent = new HashMap(); + for(String i: data) { + sent.put(i, 1); + } + + suiteBase.createTopic(topic); + + HashMap tmp = new HashMap(sent); + suiteBase.produceAndSendMessage(topic, + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms())); + return data; + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index eff468e3d0254..fd6fdc73bb947 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.util.Utils +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata class DirectKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with BeforeAndAfterAll with Eventually { @@ -39,12 +41,6 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - val brokerHost = "localhost" - - val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerHost:$brokerPort", - "auto.offset.reset" -> "smallest" - ) var ssc: StreamingContext = _ var testDir: File = _ @@ -66,13 +62,18 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } } - test("basic receiving with multiple topics") { - val topics = Set("newA", "newB") + test("basic receiving with multiple topics and smallest starting offset") { + val topics = Set("topic1", "topic2", "topic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => createTopic(t) produceAndSendMessage(t, data) } + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "smallest" + ) + ssc = new StreamingContext(sparkConf, Milliseconds(200)) val stream = withClue("Error creating direct stream") { KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( @@ -106,6 +107,95 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } ssc.stop() } + test("receiving from largest starting offset") { + val topic = "largest" + val topicPartition = TopicAndPartition(topic, 0) + val data = Map("a" -> 10) + createTopic(topic) + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "largest" + ) + val kc = new KafkaCluster(kafkaParams) + def getLatestOffset(): Long = { + kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset + } + + // Send some initial messages before starting context + produceAndSendMessage(topic, data) + eventually(timeout(10 seconds), interval(20 milliseconds)) { + assert(getLatestOffset() > 3) + } + val offsetBeforeStart = getLatestOffset() + + // Setup context and kafka stream with largest offset + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Set(topic)) + } + assert( + stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] + .fromOffsets(topicPartition) >= offsetBeforeStart, + "Start offset not from latest" + ) + + val collectedData = new mutable.ArrayBuffer[String]() + stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } + ssc.start() + val newData = Map("b" -> 10) + produceAndSendMessage(topic, newData) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + collectedData.contains("b") + } + assert(!collectedData.contains("a")) + } + + + test("creating stream by offset") { + val topic = "offset" + val topicPartition = TopicAndPartition(topic, 0) + val data = Map("a" -> 10) + createTopic(topic) + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "largest" + ) + val kc = new KafkaCluster(kafkaParams) + def getLatestOffset(): Long = { + kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset + } + + // Send some initial messages before starting context + produceAndSendMessage(topic, data) + eventually(timeout(10 seconds), interval(20 milliseconds)) { + assert(getLatestOffset() >= 10) + } + val offsetBeforeStart = getLatestOffset() + + // Setup context and kafka stream with largest offset + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( + ssc, kafkaParams, Map(topicPartition -> 11L), + (m: MessageAndMetadata[String, String]) => m.message()) + } + assert( + stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] + .fromOffsets(topicPartition) >= offsetBeforeStart, + "Start offset not from latest" + ) + + val collectedData = new mutable.ArrayBuffer[String]() + stream.foreachRDD { rdd => collectedData ++= rdd.collect() } + ssc.start() + val newData = Map("b" -> 10) + produceAndSendMessage(topic, newData) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + collectedData.contains("b") + } + assert(!collectedData.contains("a")) + } // Test to verify the offset ranges can be recovered from the checkpoints test("offset recovery") { @@ -113,6 +203,11 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase createTopic(topic) testDir = Utils.createTempDir() + val kafkaParams = Map( + "metadata.broker.list" -> s"$brokerAddress", + "auto.offset.reset" -> "smallest" + ) + // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index e57c8f6987fdc..54aa4ce498064 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -23,14 +23,10 @@ import org.scalatest.BeforeAndAfter import kafka.common.TopicAndPartition class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { - val brokerHost = "localhost" - - val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort") + val kafkaParams = Map("metadata.broker.list" -> s"$brokerAddress") val kc = new KafkaCluster(kafkaParams) - val topic = "kcsuitetopic" + Random.nextInt(10000) - val topicAndPartition = TopicAndPartition(topic, 0) before { diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index f207dc6d4fa04..c44d9e924415a 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -48,15 +48,18 @@ import org.apache.spark.util.Utils */ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging { + val zkHost = "localhost" + var zkPort: Int = 0 var zkAddress: String = _ var zkClient: ZkClient = _ - private val zkHost = "localhost" + val brokerHost = "localhost" + var brokerPort = 9092 + var brokerAddress: String = _ + private val zkConnectionTimeout = 6000 private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ - private var zkPort: Int = 0 - protected var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _ @@ -67,6 +70,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin // Get the actual zookeeper binding port zkPort = zookeeper.actualPort zkAddress = s"$zkHost:$zkPort" + logInfo("==================== 0 ====================") zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, @@ -94,6 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } Thread.sleep(2000) + brokerAddress = s"$brokerHost:$brokerPort" logInfo("==================== 4 ====================") } From bb65232c008d66c7895e83e9736353881b5d719e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 Feb 2015 18:58:39 -0800 Subject: [PATCH 05/10] Fixed test bug and refactored KafkaStreamSuite --- .../streaming/DirectKafkaWordCount.scala | 2 +- .../spark/streaming/kafka/KafkaUtils.scala | 4 +- .../kafka/JavaDirectKafkaStreamSuite.java | 32 +++------ .../streaming/kafka/JavaKafkaStreamSuite.java | 5 +- .../kafka/DirectKafkaStreamSuite.scala | 26 ++++--- .../streaming/kafka/KafkaClusterSuite.scala | 20 +++--- .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 +-- .../streaming/kafka/KafkaStreamSuite.scala | 69 ++++++++++--------- .../kafka/ReliableKafkaStreamSuite.scala | 4 +- 9 files changed, 85 insertions(+), 85 deletions(-) diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala index b09664a82ddac..46afb7f610962 100644 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkConf /** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: DirectKafkaWordCount - * is a list of one or more zookeeper servers that make quorum + * is a list of one or more Kafka brokers * is a list of one or more kafka topics to consume from * * Example: diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 14f9ec70bf378..bedb55c49e5ee 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -171,7 +171,7 @@ object KafkaUtils { K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, - VD <: Decoder[V]: ClassTag] ( + VD <: Decoder[V]: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] @@ -205,7 +205,7 @@ object KafkaUtils { V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, - R: ClassTag] ( + R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 9060ec7d9104c..35f469ef5d529 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -21,14 +21,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Random; -import java.util.List; -import java.util.ArrayList; +import java.util.Arrays; import org.apache.spark.SparkConf; -import org.apache.spark.streaming.Duration; -import scala.Predef; + import scala.Tuple2; -import scala.collection.JavaConverters; import junit.framework.Assert; @@ -74,12 +71,12 @@ public void testKafkaStream() throws InterruptedException { String topic1 = "topic1"; String topic2 = "topic2"; - List topic1data = createTopicAndSendData(topic1); - List topic2data = createTopicAndSendData(topic2); + String[] topic1data = createTopicAndSendData(topic1); + String[] topic2data = createTopicAndSendData(topic2); HashSet sent = new HashSet(); - sent.addAll(topic1data); - sent.addAll(topic2data); + sent.addAll(Arrays.asList(topic1data)); + sent.addAll(Arrays.asList(topic2data)); HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); @@ -122,7 +119,7 @@ public String call(MessageAndMetadata msgAndMd) throws Exception final HashSet result = new HashSet(); unifiedStream.foreachRDD( - new Function, Void> () { + new Function, Void>() { @Override public Void call(org.apache.spark.api.java.JavaRDD rdd) throws Exception { result.addAll(rdd.collect()); @@ -153,19 +150,10 @@ private HashMap topicOffsetToMap(String topic, Long off return topicMap; } - private List createTopicAndSendData(String topic) { - List data = java.util.Arrays.asList(topic+"-1", topic+"-2", topic+"-3"); - HashMap sent = new HashMap(); - for(String i: data) { - sent.put(i, 1); - } - + private String[] createTopicAndSendData(String topic) { + String[] data = { topic + "-1", topic + "-2", topic + "-3"}; suiteBase.createTopic(topic); - - HashMap tmp = new HashMap(sent); - suiteBase.produceAndSendMessage(topic, - JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + suiteBase.sendMessages(topic, data); return data; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 6e1abf3f385ee..208cc51b29876 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -79,9 +79,10 @@ public void testKafkaStream() throws InterruptedException { suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); - suiteBase.produceAndSendMessage(topic, + suiteBase.sendMessages(topic, JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + Predef.>conforms()) + ); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index fd6fdc73bb947..b25c2120d54f7 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -27,7 +27,7 @@ import kafka.serializer.StringDecoder import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.{Eventually, Timeouts} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.{DStream, InputDStream} @@ -41,7 +41,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - + var sc: SparkContext = _ var ssc: StreamingContext = _ var testDir: File = _ @@ -56,18 +56,23 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase after { if (ssc != null) { ssc.stop() + sc = null + } + if (sc != null) { + sc.stop() } if (testDir != null) { Utils.deleteRecursively(testDir) } } - test("basic receiving with multiple topics and smallest starting offset") { - val topics = Set("topic1", "topic2", "topic3") + + test("basic stream receiving with multiple topics and smallest starting offset") { + val topics = Set("basic1", "basic2", "basic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => createTopic(t) - produceAndSendMessage(t, data) + sendMessages(t, data) } val kafkaParams = Map( "metadata.broker.list" -> s"$brokerAddress", @@ -107,6 +112,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } ssc.stop() } + test("receiving from largest starting offset") { val topic = "largest" val topicPartition = TopicAndPartition(topic, 0) @@ -122,7 +128,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } // Send some initial messages before starting context - produceAndSendMessage(topic, data) + sendMessages(topic, data) eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() > 3) } @@ -144,7 +150,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) - produceAndSendMessage(topic, newData) + sendMessages(topic, newData) eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } @@ -167,7 +173,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } // Send some initial messages before starting context - produceAndSendMessage(topic, data) + sendMessages(topic, data) eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() >= 10) } @@ -190,7 +196,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase stream.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) - produceAndSendMessage(topic, newData) + sendMessages(topic, newData) eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } @@ -211,7 +217,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} - produceAndSendMessage(topic, strings.map { _ -> 1}.toMap) + sendMessages(topic, strings.map { _ -> 1}.toMap) eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index 54aa4ce498064..fc9275b7207be 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -19,29 +19,29 @@ package org.apache.spark.streaming.kafka import scala.util.Random -import org.scalatest.BeforeAndAfter import kafka.common.TopicAndPartition +import org.scalatest.BeforeAndAfterAll -class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter { - - val kafkaParams = Map("metadata.broker.list" -> s"$brokerAddress") - val kc = new KafkaCluster(kafkaParams) +class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { val topic = "kcsuitetopic" + Random.nextInt(10000) val topicAndPartition = TopicAndPartition(topic, 0) + var kc: KafkaCluster = null - before { + override def beforeAll() { setupKafka() createTopic(topic) - produceAndSendMessage(topic, Map("a" -> 1)) + sendMessages(topic, Map("a" -> 1)) + kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress")) } - after { + override def afterAll() { tearDownKafka() } test("metadata apis") { - val leader = kc.findLeaders(Set(topicAndPartition)).right.get - assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader") + val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition) + val leaderAddress = s"${leader._1}:${leader._2}" + assert(leaderAddress === brokerAddress, "didn't get leader") val parts = kc.getPartitions(Set(topic)).right.get assert(parts(topicAndPartition), "didn't get partitions") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 9b9e3f5fce8bd..6774db854a0d0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - produceAndSendMessage(topic, sent) + sendMessages(topic, sent) - val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort", + val kafkaParams = Map("metadata.broker.list" -> brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val kc = new KafkaCluster(kafkaParams) @@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val rdd2 = getRdd(kc, Set(topic)) val sent2 = Map("d" -> 1) - produceAndSendMessage(topic, sent2) + sendMessages(topic, sent2) // this is the "0 messages" case // make sure we dont get anything, since messages were sent after rdd was defined assert(rdd2.isDefined) assert(rdd2.get.count === 0) val rdd3 = getRdd(kc, Set(topic)) - produceAndSendMessage(topic, Map("extra" -> 22)) + sendMessages(topic, Map("extra" -> 22)) // this is the "exactly 1 message" case // make sure we get exactly one message, despite there being lots more available assert(rdd3.isDefined) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index c44d9e924415a..3e1ab155fae88 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -48,34 +48,41 @@ import org.apache.spark.util.Utils */ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging { - val zkHost = "localhost" - var zkPort: Int = 0 - var zkAddress: String = _ - var zkClient: ZkClient = _ - - val brokerHost = "localhost" - var brokerPort = 9092 - var brokerAddress: String = _ - + private val zkHost = "localhost" + private var zkPort: Int = 0 private val zkConnectionTimeout = 6000 private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ + private val brokerHost = "localhost" + private var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _ + private var zkReady = false + private var brokerReady = false + + protected var zkClient: ZkClient = _ + + def zkAddress: String = { + assert(zkReady, "Kafka not setup yet, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet, cannot get broker address") + s"$brokerHost:$brokerPort" + } def setupKafka() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - zkAddress = s"$zkHost:$zkPort" - - logInfo("==================== 0 ====================") + zkReady = true + logInfo("==================== Zookeeper Started ====================") - zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, - ZKStringSerializer) - logInfo("==================== 1 ====================") + zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + logInfo("==================== Zookeeper Client Created ====================") // Kafka broker startup var bindSuccess: Boolean = false @@ -84,9 +91,8 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin val brokerProps = getBrokerConfig() brokerConf = new KafkaConfig(brokerProps) server = new KafkaServer(brokerConf) - logInfo("==================== 2 ====================") server.startup() - logInfo("==================== 3 ====================") + logInfo("==================== Kafka Broker Started ====================") bindSuccess = true } catch { case e: KafkaException => @@ -98,11 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } Thread.sleep(2000) - brokerAddress = s"$brokerHost:$brokerPort" - logInfo("==================== 4 ====================") + logInfo("==================== Kafka Ready ====================") + brokerReady = true } def tearDownKafka() { + brokerReady = false + zkReady = true if (producer != null) { producer.close() producer = null @@ -126,26 +134,23 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } } - private def createTestMessage(topic: String, sent: Map[String, Int]) - : Seq[KeyedMessage[String, String]] = { - val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { - new KeyedMessage[String, String](topic, s) - } - messages.toSeq - } - def createTopic(topic: String) { AdminUtils.createTopic(zkClient, topic, 1, 1) - logInfo("==================== 5 ====================") // wait until metadata is propagated waitUntilMetadataIsPropagated(topic, 0) + logInfo(s"==================== Topic $topic Created ====================") } - def produceAndSendMessage(topic: String, sent: Map[String, Int]) { + def sendMessages(topic: String, messageToFreq: Map[String, Int]) { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + def sendMessages(topic: String, messages: Array[String]) { producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) - producer.send(createTestMessage(topic, sent): _*) + producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) producer.close() - logInfo("==================== 6 ====================") + logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================") } private def getBrokerConfig(): Properties = { @@ -223,7 +228,7 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) - produceAndSendMessage(topic, sent) + sendMessages(topic, sent) val kafkaParams = Map("zookeeper.connect" -> zkAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}", diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 64ccc92c81fa9..fc53c23abda85 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -79,7 +79,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter test("Reliable Kafka input stream with single topic") { var topic = "test-topic" createTopic(topic) - produceAndSendMessage(topic, data) + sendMessages(topic, data) // Verify whether the offset of this group/topic/partition is 0 before starting. assert(getCommitOffset(groupId, topic, 0) === None) @@ -111,7 +111,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => createTopic(t) - produceAndSendMessage(t, data) + sendMessages(t, data) } // Before started, verify all the group/topic/partition offsets are 0. From e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 Feb 2015 19:34:26 -0800 Subject: [PATCH 06/10] Scala doc improvements and stuff. --- .../spark/streaming/kafka/KafkaRDD.scala | 12 ++- .../streaming/kafka/KafkaRDDPartition.scala | 23 +---- .../spark/streaming/kafka/KafkaUtils.scala | 90 ++++++++++++------- .../apache/spark/streaming/kafka/Leader.scala | 21 +++-- .../spark/streaming/kafka/OffsetRange.scala | 36 ++++++-- 5 files changed, 111 insertions(+), 71 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 50bf7cbdb8dbf..d56cc01be9514 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param batch Each KafkaRDDPartition in the batch corresponds to a - * range of offsets for a given Kafka topic/partition + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */ -private[spark] +private[kafka] class KafkaRDD[ K: ClassTag, V: ClassTag, @@ -183,7 +181,7 @@ class KafkaRDD[ } } -private[spark] +private[kafka] object KafkaRDD { import KafkaCluster.LeaderOffset diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala index 36372e08f65f6..a842a6f17766f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala @@ -26,7 +26,7 @@ import org.apache.spark.Partition * @param host preferred kafka host, i.e. the leader at the time the rdd was created * @param port preferred kafka host's port */ -private[spark] +private[kafka] class KafkaRDDPartition( val index: Int, val topic: String, @@ -36,24 +36,3 @@ class KafkaRDDPartition( val host: String, val port: Int ) extends Partition - -private[spark] -object KafkaRDDPartition { - def apply( - index: Int, - topic: String, - partition: Int, - fromOffset: Long, - untilOffset: Long, - host: String, - port: Int - ): KafkaRDDPartition = new KafkaRDDPartition( - index, - topic, - partition, - fromOffset, - untilOffset, - host, - port - ) -} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index bedb55c49e5ee..7e18c13124d0e 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -155,14 +155,13 @@ object KafkaUtils { } /** - * Create a RDD from the - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + * Create a RDD from Kafka using offset ranges for each topic and partition. + * * @param sc SparkContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition */ @@ -186,18 +185,21 @@ object KafkaUtils { new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka - * configuration parameters. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ @@ -219,47 +221,73 @@ object KafkaUtils { } + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ @Experimental - def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( jsc: JavaSparkContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], - recordClass: Class[R], kafkaParams: JMap[String, String], - offsetRanges: Array[OffsetRange], - leaders: Array[Leader], - messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaRDD[R] = { + offsetRanges: Array[OffsetRange] + ): JavaPairRDD[K, V] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) - createRDD[K, V, KD, VD, R]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) + new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) } + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" + * to be set with Kafka broker(s) (NOT zookeeper servers) specified in + * host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + * @param leaders Kafka leaders for each offset range in batch + * @param messageHandler function for translating each message and metadata into the desired type + */ @Experimental - def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( + def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( jsc: JavaSparkContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], + recordClass: Class[R], kafkaParams: JMap[String, String], - offsetRanges: Array[OffsetRange] - ): JavaPairRDD[K, V] = { + offsetRanges: Array[OffsetRange], + leaders: Array[Leader], + messageHandler: JFunction[MessageAndMetadata[K, V], R] + ): JavaRDD[R] = { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) - new JavaPairRDD(createRDD[K, V, KD, VD]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + createRDD[K, V, KD, VD, R]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, messageHandler.call _) } - /** * :: Experimental :: * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee @@ -270,7 +298,7 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. -* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and @@ -375,7 +403,7 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. -* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and @@ -433,7 +461,7 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. -* - Failure Recovery: To recover from driver failures, you have to enable checkpointing + * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala index 3454d92e72b47..c129a26836c0d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala @@ -19,17 +19,28 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Host info for the leader of a Kafka TopicAndPartition */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represent the host info for the leader of a Kafka partition. + */ +@Experimental final class Leader private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, - /** kafka hostname */ + /** Leader's hostname */ val host: String, - /** kafka host's port */ + /** Leader's port */ val port: Int) extends Serializable +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[Leader]]. + */ +@Experimental object Leader { def create(topic: String, partition: Int, host: String, port: Int): Leader = new Leader(topic, partition, host, port) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala index 11bb8aa195eb1..9c3dfeb8f5928 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Something that has a collection of OffsetRanges */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +@Experimental trait HasOffsetRanges { def offsetRanges: Array[OffsetRange] } -/** Represents a range of offsets from a single Kafka TopicAndPartition */ +/** + * :: Experimental :: + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + */ +@Experimental final class OffsetRange private( - /** kafka topic name */ + /** Kafka topic name */ val topic: String, - /** kafka partition id */ + /** Kafka partition id */ val partition: Int, /** inclusive starting offset */ val fromOffset: Long, @@ -58,6 +77,11 @@ final class OffsetRange private( def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) } +/** + * :: Experimental :: + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +@Experimental object OffsetRange { def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset) @@ -78,10 +102,10 @@ object OffsetRange { new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset) /** this is to avoid ClassNotFoundException during checkpoint restore */ - private[spark] + private[kafka] type OffsetRangeTuple = (String, Int, Long, Long) - private[streaming] + private[kafka] def apply(t: OffsetRangeTuple) = new OffsetRange(t._1, t._2, t._3, t._4) } From 26df23c5578e52dc594557cfaf2170b1e0d49169 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 6 Feb 2015 14:08:33 -0800 Subject: [PATCH 07/10] Updates based on PR comments from Cody --- .../streaming/DirectKafkaWordCount.scala | 23 ++++++++++++++----- .../spark/streaming/kafka/KafkaUtils.scala | 16 +++++++++---- .../streaming/kafka/KafkaStreamSuite.scala | 8 +++---- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala index 46afb7f610962..deb08fd57b8c7 100644 --- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala @@ -35,25 +35,36 @@ import org.apache.spark.SparkConf object DirectKafkaWordCount { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: DirectKafkaWordCount ") + System.err.println(s""" + |Usage: DirectKafkaWordCount + | is a list of one or more Kafka brokers + | is a list of one or more kafka topics to consume from + | + """".stripMargin) System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Array(brokerList, topics) = args + val Array(brokers, topics) = args + + // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("checkpoint") + // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList) - val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet).map(_._2) + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get the lines, split them into words, count the words and print + val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() + // Start the computation ssc.start() ssc.awaitTermination() } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 7e18c13124d0e..48569772d5baf 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -199,7 +199,7 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message and metadata into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[ @@ -264,7 +264,7 @@ object KafkaUtils { * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message and metadata into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( @@ -298,6 +298,8 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). @@ -314,7 +316,7 @@ object KafkaUtils { * host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream - * @param messageHandler Function for translating each raw message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createDirectStream[ @@ -342,6 +344,8 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). @@ -403,6 +407,8 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). @@ -424,7 +430,7 @@ object KafkaUtils { * host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream - * @param messageHandler Function for translating each raw message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( @@ -461,6 +467,8 @@ object KafkaUtils { * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + * You can access the offsets used in each batch from the generated RDDs (see + * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 3e1ab155fae88..e4966eebb9b34 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -64,12 +64,12 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin protected var zkClient: ZkClient = _ def zkAddress: String = { - assert(zkReady, "Kafka not setup yet, cannot get zookeeper address") + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") s"$zkHost:$zkPort" } def brokerAddress: String = { - assert(brokerReady, "Kafka not setup yet, cannot get broker address") + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") s"$brokerHost:$brokerPort" } @@ -104,13 +104,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin } Thread.sleep(2000) - logInfo("==================== Kafka Ready ====================") + logInfo("==================== Kafka + Zookeeper Ready ====================") brokerReady = true } def tearDownKafka() { brokerReady = false - zkReady = true + zkReady = false if (producer != null) { producer.close() producer = null From 83d04025da1cac3d1ec8565015dbe492f17c3b79 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 8 Feb 2015 20:09:27 -0800 Subject: [PATCH 08/10] Added JavaDirectKafkaWordCount example. --- .../streaming/JavaDirectKafkaWordCount.java | 113 ++++++++++++++++++ .../kafka/JavaDirectKafkaStreamSuite.java | 2 +- 2 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java new file mode 100644 index 0000000000000..bab9f2478e779 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import kafka.serializer.StringDecoder; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.Durations; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount + * is a list of one or more Kafka brokers + * is a list of one or more kafka topics to consume from + * + * Example: + * $ bin/run-example streaming.KafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 + */ + +public final class JavaDirectKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: DirectKafkaWordCount \n" + + " is a list of one or more Kafka brokers\n" + + " is a list of one or more kafka topics to consume from\n\n"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String brokers = args[0]; + String topics = args[1]; + + // Create context with 2 second batch interval + SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); + + HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); + HashMap kafkaParams = new HashMap(); + kafkaParams.put("metadata.broker.list", brokers); + + // Create direct kafka stream with brokers and topics + JavaPairInputDStream messages = KafkaUtils.createDirectStream( + jssc, + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topicsSet + ); + + // Get the lines, split them into words, count the words and print + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + wordCounts.print(); + + // Start the computation + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 35f469ef5d529..1334cc8fd1b57 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -146,7 +146,7 @@ private HashSet topicToSet(String topic) { private HashMap topicOffsetToMap(String topic, Long offsetToStart) { HashMap topicMap = new HashMap(); - topicMap.put(new TopicAndPartition(topic, scala.Int.box(0)), offsetToStart); + topicMap.put(new TopicAndPartition(topic, 0), offsetToStart); return topicMap; } From 3ed9284e112c174563db9107e4e6f858470907a8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 9 Feb 2015 21:37:28 -0800 Subject: [PATCH 09/10] updated scala doc --- .../spark/streaming/kafka/KafkaUtils.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 48569772d5baf..eefbc979082e9 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -290,8 +290,9 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee - * that each message from Kafka is included in transformations exactly once (see points below). + * Create an input stream that directly pulls messages from a Kafka Broker + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka @@ -336,8 +337,9 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee - * that each message from Kafka is included in transformations exactly once (see points below). + * Create an input stream that directly pulls messages from a Kafka Broker + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka @@ -399,8 +401,9 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee - * that each message from Kafka is included in transformations exactly once (see points below). + * Create an input stream that directly pulls messages from a Kafka Broker + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka @@ -459,8 +462,9 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that pulls messages from a Kafka Broker. This stream can guarantee - * that each message from Kafka is included in transformations exactly once (see points below). + * Create an input stream that directly pulls messages from a Kafka Broker + * without using any receiver. This stream can guarantee that each message + * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka From 7c931c3fd174376fc04a436a64ec414dbe8eac46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 9 Feb 2015 21:41:22 -0800 Subject: [PATCH 10/10] Small update --- .../spark/streaming/kafka/KafkaUtils.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index eefbc979082e9..7a2c3abdcc24b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -41,7 +41,7 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} object KafkaUtils { /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer @@ -65,7 +65,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param kafkaParams Map of kafka configuration parameters, * see http://kafka.apache.org/08/configuration.html @@ -84,7 +84,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) @@ -102,7 +102,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. @@ -122,7 +122,7 @@ object KafkaUtils { } /** - * Create an input stream that pulls messages from a Kafka Broker. + * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param keyTypeClass Key type of DStream * @param valueTypeClass value type of Dstream @@ -290,7 +290,7 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that directly pulls messages from a Kafka Broker + * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). * @@ -337,7 +337,7 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that directly pulls messages from a Kafka Broker + * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). * @@ -401,7 +401,7 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that directly pulls messages from a Kafka Broker + * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). * @@ -462,7 +462,7 @@ object KafkaUtils { /** * :: Experimental :: - * Create an input stream that directly pulls messages from a Kafka Broker + * Create an input stream that directly pulls messages from Kafka Brokers * without using any receiver. This stream can guarantee that each message * from Kafka is included in transformations exactly once (see points below). *