From 5536f9500e49f3e14b474cda7fab681c404655f5 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 10 Mar 2015 16:31:05 +0800 Subject: [PATCH 01/15] Refactor the Kafka unit test and add Python Kafka unittest support --- .../streaming/kafka/KafkaTestUtils.scala | 255 ++++++++++++++++++ .../kafka/JavaDirectKafkaStreamSuite.java | 33 ++- .../streaming/kafka/JavaKafkaRDDSuite.java | 25 +- .../streaming/kafka/JavaKafkaStreamSuite.java | 28 +- .../kafka/DirectKafkaStreamSuite.scala | 51 ++-- .../streaming/kafka/KafkaClusterSuite.scala | 24 +- .../spark/streaming/kafka/KafkaRDDSuite.scala | 36 ++- .../streaming/kafka/KafkaStreamSuite.scala | 203 ++------------ .../kafka/ReliableKafkaStreamSuite.scala | 44 ++- python/pyspark/streaming/tests.py | 43 ++- python/run-tests | 20 +- 11 files changed, 486 insertions(+), 276 deletions(-) create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala new file mode 100644 index 000000000000..0e263bef2925 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File +import java.lang.{Integer => JInt} +import java.net.InetSocketAddress +import java.util.{Map => JMap} +import java.util.Properties +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.language.postfixOps +import scala.util.Random + +import kafka.admin.AdminUtils +import kafka.common.KafkaException +import kafka.producer.{KeyedMessage, Producer, ProducerConfig} +import kafka.serializer.StringEncoder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} + +import org.apache.spark.Logging +import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils + +/** + * This is a helper class for Kafka test suites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + */ +private class KafkaTestUtils extends Logging { + + // Zookeeper related configurations + private val zkHost = "localhost" + private var zkPort: Int = 0 + private val zkConnectionTimeout = 6000 + private val zkSessionTimeout = 6000 + + private var zookeeper: EmbeddedZookeeper = _ + + var zkClient: ZkClient = _ + + // Kafka broker related configurations + private val brokerHost = "localhost" + private var brokerPort = 9092 + private var brokerConf: KafkaConfig = _ + + // Kafka broker server + private var server: KafkaServer = _ + + // Kafka producer + private var producer: Producer[String, String] = _ + + // Flag to test whether the system is correctly started + private var zkReady = false + private var brokerReady = false + + def zkAddress: String = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") + s"$brokerHost:$brokerPort" + } + + /** Set up the Embedded Zookeeper server and get the proper Zookeeper port */ + def setupEmbeddedZookeeper(): Unit = { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + // Get the actual zookeeper binding port + zkPort = zookeeper.actualPort + zkReady = true + zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + } + + /** Set up the Embedded Kafka server */ + def setupEmbeddedKafkaServer(): Unit = { + assert(zkReady, "Zookeeper should be set up beforehand") + // Kafka broker startup + var bindSuccess: Boolean = false + while(!bindSuccess) { + try { + brokerConf = new KafkaConfig(brokerConfigure) + server = new KafkaServer(brokerConf) + server.startup() + bindSuccess = true + } catch { + case e: KafkaException => + if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) { + brokerPort += 1 + } + case e: Exception => throw new Exception("Kafka server create failed", e) + } + } + + Thread.sleep(2000) + brokerReady = true + } + + /** Tear down the whole servers, including Kafka broker and Zookeeper */ + def tearDownEmbeddedServers(): Unit = { + brokerReady = false + zkReady = false + + if (producer != null) { + producer.close() + producer = null + } + + if (server != null) { + server.shutdown() + server = null + } + + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + + if (zkClient != null) { + zkClient.close() + zkClient = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } + + /** Create a Kafka topic and wait until it propagated to the whole cluster */ + def createTopic(topic: String): Unit = { + AdminUtils.createTopic(zkClient, topic, 1, 1) + // wait until metadata is propagated + waitUntilMetadataIsPropagated(topic, 0) + } + + def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { + import scala.collection.JavaConversions._ + sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) + } + + /** Send the messages with its duplications to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + /** Send the array of messages to the Kafka broker */ + def sendMessages(topic: String, messages: Array[String]): Unit = { + producer = new Producer[String, String](new ProducerConfig(producerConfigure)) + producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) + producer.close() + producer = null + } + + private def brokerConfigure: Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("port", brokerPort.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkAddress) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props + } + + private def producerConfigure: Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerAddress) + props.put("serializer.class", classOf[StringEncoder].getName) + props + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + eventually(Time(10000), Time(100)) { + assert( + server.apis.metadataCache.containsTopicAndPartition(topic, partition), + s"Partition [$topic, $partition] metadata not propagated after timeout" + ) + } + } + + // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test + // dependency + private def eventually[T](timeout: Time, interval: Time)(func: => T): T = { + def makeAttempt(): Either[Throwable, T] = { + try { + Right(func) + } catch { + case e: Throwable => Left(e) + } + } + + val startTime = System.currentTimeMillis() + @tailrec + def tryAgain(attempt: Int): T = { + makeAttempt() match { + case Right(result) => result + case Left(e) => + val duration = System.currentTimeMillis() - startTime + if (duration < timeout.milliseconds) { + Thread.sleep(interval.milliseconds) + } else { + throw new TimeoutException(e.getMessage) + } + + tryAgain(attempt + 1) + } + } + + tryAgain(1) + } + + class EmbeddedZookeeper(val zkConnect: String) { + val random = new Random() + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + val actualPort = factory.getLocalPort + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } + } +} + diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index d6ca6d58b566..26df91094054 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -41,24 +41,33 @@ public class JavaDirectKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; - private transient KafkaStreamSuiteBase suiteBase = null; + private transient Random random = new Random(); + private transient KafkaTestUtils kafkaTestUtils = null; @Before public void setUp() { - suiteBase = new KafkaStreamSuiteBase() { }; - suiteBase.setupKafka(); - System.clearProperty("spark.driver.port"); - SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); - ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setupEmbeddedZookeeper(); + kafkaTestUtils.setupEmbeddedKafkaServer(); + System.clearProperty("spark.driver.port"); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); } @After public void tearDown() { + if (ssc != null) { ssc.stop(); ssc = null; - System.clearProperty("spark.driver.port"); - suiteBase.tearDownKafka(); + } + + System.clearProperty("spark.driver.port"); + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils = null; + } } @Test @@ -74,7 +83,7 @@ public void testKafkaStream() throws InterruptedException { sent.addAll(Arrays.asList(topic2data)); HashMap kafkaParams = new HashMap(); - kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); kafkaParams.put("auto.offset.reset", "smallest"); JavaDStream stream1 = KafkaUtils.createDirectStream( @@ -147,8 +156,8 @@ private HashMap topicOffsetToMap(String topic, Long off private String[] createTopicAndSendData(String topic) { String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - suiteBase.createTopic(topic); - suiteBase.sendMessages(topic, data); + kafkaTestUtils.createTopic(topic); + kafkaTestUtils.sendMessages(topic, data); return data; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index 4477b81827c7..3e79e06f547d 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -37,12 +37,13 @@ public class JavaKafkaRDDSuite implements Serializable { private transient JavaSparkContext sc = null; - private transient KafkaStreamSuiteBase suiteBase = null; + private transient KafkaTestUtils kafkaTestUtils = null; @Before public void setUp() { - suiteBase = new KafkaStreamSuiteBase() { }; - suiteBase.setupKafka(); + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setupEmbeddedZookeeper(); + kafkaTestUtils.setupEmbeddedKafkaServer(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -51,10 +52,16 @@ public void setUp() { @After public void tearDown() { - sc.stop(); - sc = null; + if (sc != null) { + sc.stop(); + sc = null; + } System.clearProperty("spark.driver.port"); - suiteBase.tearDownKafka(); + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils = null; + } } @Test @@ -66,7 +73,7 @@ public void testKafkaRDD() throws InterruptedException { String[] topic2data = createTopicAndSendData(topic2); HashMap kafkaParams = new HashMap(); - kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress()); + kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), @@ -144,8 +151,8 @@ public String call(MessageAndMetadata msgAndMd) throws Exception private String[] createTopicAndSendData(String topic) { String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - suiteBase.createTopic(topic); - suiteBase.sendMessages(topic, data); + kafkaTestUtils.createTopic(topic); + kafkaTestUtils.sendMessages(topic, data); return data; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index bad0a93eb2e8..1a009d0f98de 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -44,12 +44,13 @@ public class JavaKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; private transient Random random = new Random(); - private transient KafkaStreamSuiteBase suiteBase = null; + private transient KafkaTestUtils kafkaTestUtils = null; @Before public void setUp() { - suiteBase = new KafkaStreamSuiteBase() { }; - suiteBase.setupKafka(); + kafkaTestUtils = new KafkaTestUtils(); + kafkaTestUtils.setupEmbeddedZookeeper(); + kafkaTestUtils.setupEmbeddedKafkaServer(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -58,10 +59,17 @@ public void setUp() { @After public void tearDown() { - ssc.stop(); - ssc = null; + if (ssc != null) { + ssc.stop(); + ssc = null; + } + System.clearProperty("spark.driver.port"); - suiteBase.tearDownKafka(); + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils = null; + } } @Test @@ -75,15 +83,15 @@ public void testKafkaStream() throws InterruptedException { sent.put("b", 3); sent.put("c", 10); - suiteBase.createTopic(topic); + kafkaTestUtils.createTopic(topic); HashMap tmp = new HashMap(sent); - suiteBase.sendMessages(topic, + kafkaTestUtils.sendMessages(topic, JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( Predef.>conforms()) ); HashMap kafkaParams = new HashMap(); - kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); + kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); @@ -126,6 +134,7 @@ public Void call(JavaPairRDD rdd) throws Exception { ); ssc.start(); + long startTime = System.currentTimeMillis(); boolean sizeMatches = false; while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) { @@ -136,6 +145,5 @@ public Void call(JavaPairRDD rdd) throws Exception { for (String k : sent.keySet()) { Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); } - ssc.stop(); } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 17ca9d145d66..9908d99d96b1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -27,17 +27,21 @@ import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -class DirectKafkaStreamSuite extends KafkaStreamSuiteBase - with BeforeAndAfter with BeforeAndAfterAll with Eventually { +class DirectKafkaStreamSuite + extends FunSuite + with BeforeAndAfter + with BeforeAndAfterAll + with Eventually + with Logging { val sparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) @@ -46,12 +50,19 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase var ssc: StreamingContext = _ var testDir: File = _ + var kafkaTestUtils: KafkaTestUtils = _ + override def beforeAll { - setupKafka() + kafkaTestUtils = new KafkaTestUtils + kafkaTestUtils.setupEmbeddedZookeeper() + kafkaTestUtils.setupEmbeddedKafkaServer() } override def afterAll { - tearDownKafka() + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils = null + } } after { @@ -72,12 +83,12 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase val topics = Set("basic1", "basic2", "basic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => - createTopic(t) - sendMessages(t, data) + kafkaTestUtils.createTopic(t) + kafkaTestUtils.sendMessages(t, data) } val totalSent = data.values.sum * topics.size val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerAddress", + "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", "auto.offset.reset" -> "smallest" ) @@ -121,9 +132,9 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase val topic = "largest" val topicPartition = TopicAndPartition(topic, 0) val data = Map("a" -> 10) - createTopic(topic) + kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerAddress", + "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -132,7 +143,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } // Send some initial messages before starting context - sendMessages(topic, data) + kafkaTestUtils.sendMessages(topic, data) eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() > 3) } @@ -154,7 +165,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) - sendMessages(topic, newData) + kafkaTestUtils.sendMessages(topic, newData) eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } @@ -166,9 +177,9 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase val topic = "offset" val topicPartition = TopicAndPartition(topic, 0) val data = Map("a" -> 10) - createTopic(topic) + kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerAddress", + "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -177,7 +188,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } // Send some initial messages before starting context - sendMessages(topic, data) + kafkaTestUtils.sendMessages(topic, data) eventually(timeout(10 seconds), interval(20 milliseconds)) { assert(getLatestOffset() >= 10) } @@ -200,7 +211,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase stream.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) - sendMessages(topic, newData) + kafkaTestUtils.sendMessages(topic, newData) eventually(timeout(10 seconds), interval(50 milliseconds)) { collectedData.contains("b") } @@ -210,18 +221,18 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase // Test to verify the offset ranges can be recovered from the checkpoints test("offset recovery") { val topic = "recovery" - createTopic(topic) + kafkaTestUtils.createTopic(topic) testDir = Utils.createTempDir() val kafkaParams = Map( - "metadata.broker.list" -> s"$brokerAddress", + "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", "auto.offset.reset" -> "smallest" ) // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { val strings = data.map { _.toString} - sendMessages(topic, strings.map { _ -> 1}.toMap) + kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) eventually(timeout(10 seconds), interval(50 milliseconds)) { assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index fc9275b7207b..4b2f9fb9e560 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -20,28 +20,36 @@ package org.apache.spark.streaming.kafka import scala.util.Random import kafka.common.TopicAndPartition -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{FunSuite, BeforeAndAfterAll} -class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { +class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { val topic = "kcsuitetopic" + Random.nextInt(10000) val topicAndPartition = TopicAndPartition(topic, 0) var kc: KafkaCluster = null + var kafkaTestUtils: KafkaTestUtils = _ + override def beforeAll() { - setupKafka() - createTopic(topic) - sendMessages(topic, Map("a" -> 1)) - kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress")) + kafkaTestUtils = new KafkaTestUtils + kafkaTestUtils.setupEmbeddedZookeeper() + kafkaTestUtils.setupEmbeddedKafkaServer() + + kafkaTestUtils.createTopic(topic) + kafkaTestUtils.sendMessages(topic, Map("a" -> 1)) + kc = new KafkaCluster(Map("metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}")) } override def afterAll() { - tearDownKafka() + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils = null + } } test("metadata apis") { val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition) val leaderAddress = s"${leader._1}:${leader._2}" - assert(leaderAddress === brokerAddress, "didn't get leader") + assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader") val parts = kc.getPartitions(Set(topic)).right.get assert(parts(topicAndPartition), "didn't get partitions") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index a223da70b043..cfabf59fd80e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -22,18 +22,22 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{FunSuite, BeforeAndAfterAll} import org.apache.spark._ -import org.apache.spark.SparkContext._ -class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { +class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { + + var kafkaTestUtils: KafkaTestUtils = _ + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) var sc: SparkContext = _ + override def beforeAll { sc = new SparkContext(sparkConf) - - setupKafka() + kafkaTestUtils = new KafkaTestUtils + kafkaTestUtils.setupEmbeddedZookeeper() + kafkaTestUtils.setupEmbeddedKafkaServer() } override def afterAll { @@ -41,17 +45,21 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { sc.stop sc = null } - tearDownKafka() + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils = null + } } test("basic usage") { val topic = "topicbasic" - createTopic(topic) + kafkaTestUtils.createTopic(topic) val messages = Set("the", "quick", "brown", "fox") - sendMessages(topic, messages.toArray) + kafkaTestUtils.sendMessages(topic, messages.toArray) - val kafkaParams = Map("metadata.broker.list" -> brokerAddress, + val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -67,15 +75,15 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - createTopic(topic) + kafkaTestUtils.createTopic(topic) - val kafkaParams = Map("metadata.broker.list" -> brokerAddress, + val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}") val kc = new KafkaCluster(kafkaParams) // this is the "lots of messages" case - sendMessages(topic, sent) + kafkaTestUtils.sendMessages(topic, sent) // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) @@ -92,14 +100,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll { // shouldn't get anything, since message is sent after rdd was defined val sentOnlyOne = Map("d" -> 1) - sendMessages(topic, sentOnlyOne) + kafkaTestUtils.sendMessages(topic, sentOnlyOne) assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above val rdd3 = getRdd(kc, Set(topic)) // send lots of messages after rdd was defined, they shouldn't show up - sendMessages(topic, Map("extra" -> 22)) + kafkaTestUtils.sendMessages(topic, Map("extra" -> 22)) assert(rdd3.isDefined) assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message") diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index e4966eebb9b3..d3eede73258a 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -17,201 +17,27 @@ package org.apache.spark.streaming.kafka -import java.io.File -import java.net.InetSocketAddress -import java.util.Properties - import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.AdminUtils -import kafka.common.{KafkaException, TopicAndPartition} -import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer.{StringDecoder, StringEncoder} -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} +import kafka.serializer.StringDecoder import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.util.Utils - -/** - * This is an abstract base class for Kafka testsuites. This has the functionality to set up - * and tear down local Kafka servers, and to push data using Kafka producers. - */ -abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging { - - private val zkHost = "localhost" - private var zkPort: Int = 0 - private val zkConnectionTimeout = 6000 - private val zkSessionTimeout = 6000 - private var zookeeper: EmbeddedZookeeper = _ - private val brokerHost = "localhost" - private var brokerPort = 9092 - private var brokerConf: KafkaConfig = _ - private var server: KafkaServer = _ - private var producer: Producer[String, String] = _ - private var zkReady = false - private var brokerReady = false - - protected var zkClient: ZkClient = _ - - def zkAddress: String = { - assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") - s"$zkHost:$zkPort" - } - - def brokerAddress: String = { - assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") - s"$brokerHost:$brokerPort" - } - - def setupKafka() { - // Zookeeper server startup - zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") - // Get the actual zookeeper binding port - zkPort = zookeeper.actualPort - zkReady = true - logInfo("==================== Zookeeper Started ====================") - - zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) - logInfo("==================== Zookeeper Client Created ====================") - - // Kafka broker startup - var bindSuccess: Boolean = false - while(!bindSuccess) { - try { - val brokerProps = getBrokerConfig() - brokerConf = new KafkaConfig(brokerProps) - server = new KafkaServer(brokerConf) - server.startup() - logInfo("==================== Kafka Broker Started ====================") - bindSuccess = true - } catch { - case e: KafkaException => - if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) { - brokerPort += 1 - } - case e: Exception => throw new Exception("Kafka server create failed", e) - } - } - - Thread.sleep(2000) - logInfo("==================== Kafka + Zookeeper Ready ====================") - brokerReady = true - } - - def tearDownKafka() { - brokerReady = false - zkReady = false - if (producer != null) { - producer.close() - producer = null - } - - if (server != null) { - server.shutdown() - server = null - } - - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } - - if (zkClient != null) { - zkClient.close() - zkClient = null - } - - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null - } - } - def createTopic(topic: String) { - AdminUtils.createTopic(zkClient, topic, 1, 1) - // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) - logInfo(s"==================== Topic $topic Created ====================") - } - - def sendMessages(topic: String, messageToFreq: Map[String, Int]) { - val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray - sendMessages(topic, messages) - } - - def sendMessages(topic: String, messages: Array[String]) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) - producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) - producer.close() - logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================") - } - - private def getBrokerConfig(): Properties = { - val props = new Properties() - props.put("broker.id", "0") - props.put("host.name", "localhost") - props.put("port", brokerPort.toString) - props.put("log.dir", Utils.createTempDir().getAbsolutePath) - props.put("zookeeper.connect", zkAddress) - props.put("log.flush.interval.messages", "1") - props.put("replica.socket.timeout.ms", "1500") - props - } - - private def getProducerConfig(): Properties = { - val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - val props = new Properties() - props.put("metadata.broker.list", brokerAddr) - props.put("serializer.class", classOf[StringEncoder].getName) - props - } - - private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert( - server.apis.metadataCache.containsTopicAndPartition(topic, partition), - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) - } - } - - class EmbeddedZookeeper(val zkConnect: String) { - val random = new Random() - val snapshotDir = Utils.createTempDir() - val logDir = Utils.createTempDir() - - val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) - val (ip, port) = { - val splits = zkConnect.split(":") - (splits(0), splits(1).toInt) - } - val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress(ip, port), 16) - factory.startup(zookeeper) - - val actualPort = factory.getLocalPort - - def shutdown() { - factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) - } - } -} - - -class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { +class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfter { var ssc: StreamingContext = _ + var kafkaTestUtils: KafkaTestUtils = _ before { - setupKafka() + kafkaTestUtils = new KafkaTestUtils + kafkaTestUtils.setupEmbeddedZookeeper() + kafkaTestUtils.setupEmbeddedKafkaServer() } after { @@ -219,7 +45,11 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { ssc.stop() ssc = null } - tearDownKafka() + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils = null + } } test("Kafka input stream") { @@ -227,10 +57,10 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { ssc = new StreamingContext(sparkConf, Milliseconds(500)) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - createTopic(topic) - sendMessages(topic, sent) + kafkaTestUtils.createTopic(topic) + kafkaTestUtils.sendMessages(topic, sent) - val kafkaParams = Map("zookeeper.connect" -> zkAddress, + val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress, "group.id" -> s"test-consumer-${Random.nextInt(10000)}", "auto.offset.reset" -> "smallest") @@ -244,14 +74,15 @@ class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter { result.put(kv._1, count) } } + ssc.start() + eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { assert(sent.size === result.size) sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } } - ssc.stop() } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 3cd960d1fd1d..ba7743e9dad0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -27,7 +27,8 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} -import org.scalatest.BeforeAndAfter +import org.apache.commons.io.FileUtils +import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf @@ -35,7 +36,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils -class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { +class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventually { val sparkConf = new SparkConf() .setMaster("local[4]") @@ -43,6 +44,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter .set("spark.streaming.receiver.writeAheadLog.enable", "true") val data = Map("a" -> 10, "b" -> 10, "c" -> 10) + var kafkaTestUtils: KafkaTestUtils = _ var groupId: String = _ var kafkaParams: Map[String, String] = _ @@ -50,10 +52,13 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter var tempDirectory: File = null before { - setupKafka() + kafkaTestUtils = new KafkaTestUtils + kafkaTestUtils.setupEmbeddedZookeeper() + kafkaTestUtils.setupEmbeddedKafkaServer() + groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( - "zookeeper.connect" -> zkAddress, + "zookeeper.connect" -> kafkaTestUtils.zkAddress, "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) @@ -67,15 +72,28 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter if (ssc != null) { ssc.stop() } +<<<<<<< HEAD Utils.deleteRecursively(tempDirectory) tearDownKafka() +======= + + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + + if (kafkaTestUtils != null) { + kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils = null + } +>>>>>>> Refactor the Kafka unit test and add Python Kafka unittest support } test("Reliable Kafka input stream with single topic") { - var topic = "test-topic" - createTopic(topic) - sendMessages(topic, data) + val topic = "test-topic" + kafkaTestUtils.createTopic(topic) + kafkaTestUtils.sendMessages(topic, data) // Verify whether the offset of this group/topic/partition is 0 before starting. assert(getCommitOffset(groupId, topic, 0) === None) @@ -91,6 +109,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } } ssc.start() + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. @@ -100,14 +119,13 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter // Verify the offset number whether it is equal to the total message number. assert(getCommitOffset(groupId, topic, 0) === Some(29L)) } - ssc.stop() } test("Reliable Kafka input stream with multiple topics") { val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => - createTopic(t) - sendMessages(t, data) + kafkaTestUtils.createTopic(t) + kafkaTestUtils.sendMessages(t, data) } // Before started, verify all the group/topic/partition offsets are 0. @@ -118,19 +136,19 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY) stream.foreachRDD(_ => Unit) ssc.start() + eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { // Verify the offset for each group/topic to see whether they are equal to the expected one. topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } } - ssc.stop() } /** Getting partition offset from Zookeeper. */ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { - assert(zkClient != null, "Zookeeper client is not initialized") + assert(kafkaTestUtils.zkClient != null, "Zookeeper client is not initialized") val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong) + ZkUtils.readDataMaybeNull(kafkaTestUtils.zkClient, zkPath)._1.map(_.toLong) } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 608f8e26473a..641f9ef3e079 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -23,13 +23,16 @@ import tempfile import struct +from py4j.java_collections import MapConverter + from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext +from pyspark.streaming.kafka import KafkaUtils class PySparkStreamingTestCase(unittest.TestCase): - timeout = 10 # seconds + timeout = 20 # seconds duration = 1 def setUp(self): @@ -556,5 +559,43 @@ def check_output(n): check_output(3) +class KafkaStreamTests(PySparkStreamingTestCase): + + def setUp(self): + super(KafkaStreamTests, self).setUp() + + kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") + self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() + self._kafkaTestUtils.setupEmbeddedZookeeper() + self._kafkaTestUtils.setupEmbeddedKafkaServer() + + def tearDown(self): + if self._kafkaTestUtils is not None: + self._kafkaTestUtils.tearDownEmbeddedServers() + self._kafkaTestUtils = None + + super(KafkaStreamTests, self).tearDown() + + def test_kafka_stream(self): + """Test the Python Kafka stream API.""" + topic = "topic1" + sendData = {"a": 3, "b": 5, "c": 10} + jSendData = MapConverter().convert(sendData, + self.ssc.sparkContext._gateway._gateway_client) + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, jSendData) + + stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), + "test-streaming-consumer", {topic: 1}, + {"auto.offset.reset": "smallest"}) + + result = {} + for i in sum(self._collect(stream.map(lambda x: x[1]), 18), []): + result[i] = result.get(i, 0) + 1 + + self.assertEqual(sendData, result) + if __name__ == "__main__": unittest.main() diff --git a/python/run-tests b/python/run-tests index f569a56fb7a9..11cf0cefa441 100755 --- a/python/run-tests +++ b/python/run-tests @@ -27,6 +27,8 @@ cd "$FWDIR/python" FAILED=0 LOG_FILE=unit-tests.log +. "$FWDIR"/bin/load-spark-env.sh + rm -f $LOG_FILE # Remove the metastore and warehouse directory created by the HiveContext tests in Spark SQL @@ -35,7 +37,7 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" | tee -a $LOG_FILE - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 $2 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -57,7 +59,7 @@ function run_core_tests() { PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py" PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py" run_test "pyspark/serializers.py" - run_test "pyspark/profiler.py" + run_test "pyspark/profiler.py" run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" } @@ -97,8 +99,20 @@ function run_ml_tests() { function run_streaming_tests() { echo "Run streaming tests ..." + + KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly + JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}" + for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do + if [[ ! -e "$f" ]]; then + echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 + echo "You need to build Spark before running this program" 1>&2 + exit 1 + fi + KAFKA_ASSEMBLY_JAR="$f" + done + run_test "pyspark/streaming/util.py" - run_test "pyspark/streaming/tests.py" + run_test "pyspark/streaming/tests.py" "--jars ${KAFKA_ASSEMBLY_JAR}" } echo "Running PySpark tests. Output is in python/$LOG_FILE." From fde1213aca7d8e3e40325fceb3feae1868ca083e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 10 Mar 2015 17:09:39 +0800 Subject: [PATCH 02/15] Code style changes --- .../spark/streaming/kafka/KafkaTestUtils.scala | 5 +++-- .../streaming/kafka/JavaKafkaStreamSuite.java | 7 +------ .../streaming/kafka/DirectKafkaStreamSuite.scala | 8 ++++---- .../spark/streaming/kafka/KafkaClusterSuite.scala | 8 ++++---- .../spark/streaming/kafka/KafkaRDDSuite.scala | 7 ++++--- .../spark/streaming/kafka/KafkaStreamSuite.scala | 4 ++-- .../streaming/kafka/ReliableKafkaStreamSuite.scala | 14 +++++++------- 7 files changed, 25 insertions(+), 28 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 0e263bef2925..3a7da3fb243a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -34,8 +34,8 @@ import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} +import org.I0Itec.zkclient.ZkClient import org.apache.spark.Logging import org.apache.spark.streaming.Time @@ -151,12 +151,13 @@ private class KafkaTestUtils extends Logging { waitUntilMetadataIsPropagated(topic, 0) } + /** Java function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { import scala.collection.JavaConversions._ sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) } - /** Send the messages with its duplications to the Kafka broker */ + /** Send the messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray sendMessages(topic, messages) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 1a009d0f98de..4b46d9975eec 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -24,7 +24,6 @@ import scala.Predef; import scala.Tuple2; -import scala.collection.JavaConverters; import kafka.serializer.StringDecoder; import org.junit.After; @@ -84,11 +83,7 @@ public void testKafkaStream() throws InterruptedException { sent.put("c", 10); kafkaTestUtils.createTopic(topic); - HashMap tmp = new HashMap(sent); - kafkaTestUtils.sendMessages(topic, - JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms()) - ); + kafkaTestUtils.sendMessages(topic, sent); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9908d99d96b1..df5e15fa5e3d 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -46,11 +46,11 @@ class DirectKafkaStreamSuite .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - var sc: SparkContext = _ - var ssc: StreamingContext = _ - var testDir: File = _ + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + private var testDir: File = _ - var kafkaTestUtils: KafkaTestUtils = _ + private var kafkaTestUtils: KafkaTestUtils = _ override def beforeAll { kafkaTestUtils = new KafkaTestUtils diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index 4b2f9fb9e560..be668043faf5 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -23,11 +23,11 @@ import kafka.common.TopicAndPartition import org.scalatest.{FunSuite, BeforeAndAfterAll} class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { - val topic = "kcsuitetopic" + Random.nextInt(10000) - val topicAndPartition = TopicAndPartition(topic, 0) - var kc: KafkaCluster = null + private val topic = "kcsuitetopic" + Random.nextInt(10000) + private val topicAndPartition = TopicAndPartition(topic, 0) + private var kc: KafkaCluster = null - var kafkaTestUtils: KafkaTestUtils = _ + private var kafkaTestUtils: KafkaTestUtils = _ override def beforeAll() { kafkaTestUtils = new KafkaTestUtils diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index cfabf59fd80e..419e12cb1b85 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -28,10 +28,11 @@ import org.apache.spark._ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { - var kafkaTestUtils: KafkaTestUtils = _ + private var kafkaTestUtils: KafkaTestUtils = _ - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - var sc: SparkContext = _ + private val sparkConf = new SparkConf().setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + private var sc: SparkContext = _ override def beforeAll { sc = new SparkContext(sparkConf) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index d3eede73258a..dee6c4ebd253 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfter { - var ssc: StreamingContext = _ - var kafkaTestUtils: KafkaTestUtils = _ + private var ssc: StreamingContext = _ + private var kafkaTestUtils: KafkaTestUtils = _ before { kafkaTestUtils = new KafkaTestUtils diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index ba7743e9dad0..e8c551ab9cb2 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -38,18 +38,18 @@ import org.apache.spark.util.Utils class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventually { - val sparkConf = new SparkConf() + private val sparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) .set("spark.streaming.receiver.writeAheadLog.enable", "true") - val data = Map("a" -> 10, "b" -> 10, "c" -> 10) + private val data = Map("a" -> 10, "b" -> 10, "c" -> 10) - var kafkaTestUtils: KafkaTestUtils = _ + private var kafkaTestUtils: KafkaTestUtils = _ - var groupId: String = _ - var kafkaParams: Map[String, String] = _ - var ssc: StreamingContext = _ - var tempDirectory: File = null + private var groupId: String = _ + private var kafkaParams: Map[String, String] = _ + private var ssc: StreamingContext = _ + private var tempDirectory: File = null before { kafkaTestUtils = new KafkaTestUtils From 8102d6e4f51a468a77923d1a1c8d6067903698e7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 10 Mar 2015 20:54:24 +0800 Subject: [PATCH 03/15] Fix bug in Jenkins test --- python/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/run-tests b/python/run-tests index 11cf0cefa441..ea45a75af8c3 100755 --- a/python/run-tests +++ b/python/run-tests @@ -102,7 +102,7 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}" - for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do + for f in "${JAR_PATH}"/spark-streaming-kafka-assembly*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 echo "You need to build Spark before running this program" 1>&2 From 6020b0050efbb12c33321adf891e1bda23e595b1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 11 Mar 2015 09:57:35 +0800 Subject: [PATCH 04/15] Add more debug info in Shell --- python/run-tests | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/run-tests b/python/run-tests index ea45a75af8c3..a593d7b94e8b 100755 --- a/python/run-tests +++ b/python/run-tests @@ -37,6 +37,8 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" | tee -a $LOG_FILE + echo "Additional argument: $2" | tee -a $LOG_FILE + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 $2 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) From 8ad442ff472ba2af74bccd0327b41c62e806e0ff Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 11 Mar 2015 22:19:14 +0800 Subject: [PATCH 05/15] Add kafka-assembly in run-tests --- dev/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run-tests b/dev/run-tests index 1b6cf78b5da0..850cf4d795b8 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD build/mvn $HIVE_BUILD_ARGS clean package -DskipTests else echo -e "q\n" \ - | build/sbt $HIVE_BUILD_ARGS package assembly/assembly \ + | build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } From 64d9877529d56a88869db187898ed38a59420a9b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 11 Mar 2015 22:31:42 +0800 Subject: [PATCH 06/15] Fix rebase bugs --- .../spark/streaming/kafka/JavaDirectKafkaStreamSuite.java | 1 - .../org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 26df91094054..d617c4c6261d 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -41,7 +41,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; - private transient Random random = new Random(); private transient KafkaTestUtils kafkaTestUtils = null; @Before diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index 3e79e06f547d..c6b31f9760f7 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -82,7 +82,7 @@ public void testKafkaRDD() throws InterruptedException { HashMap emptyLeaders = new HashMap(); HashMap leaders = new HashMap(); - String[] hostAndPort = suiteBase.brokerAddress().split(":"); + String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); leaders.put(new TopicAndPartition(topic1, 0), broker); leaders.put(new TopicAndPartition(topic2, 0), broker); From 61a04f00de2f3ca90d10bdc4d9eb6732cb282cef Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Mar 2015 13:01:47 +0800 Subject: [PATCH 07/15] Fix bugs and address the issues --- .../streaming/kafka/KafkaTestUtils.scala | 96 +++++++++++-------- .../kafka/JavaDirectKafkaStreamSuite.java | 5 +- .../streaming/kafka/JavaKafkaRDDSuite.java | 5 +- .../streaming/kafka/JavaKafkaStreamSuite.java | 6 +- .../kafka/DirectKafkaStreamSuite.scala | 13 ++- .../streaming/kafka/KafkaClusterSuite.scala | 9 +- .../spark/streaming/kafka/KafkaRDDSuite.scala | 7 +- .../streaming/kafka/KafkaStreamSuite.scala | 13 ++- .../kafka/ReliableKafkaStreamSuite.scala | 44 ++++----- python/run-tests | 9 +- 10 files changed, 107 insertions(+), 100 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 3a7da3fb243a..7d67a72d5cc7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.language.postfixOps import scala.util.Random +import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.common.KafkaException @@ -44,6 +45,8 @@ import org.apache.spark.util.Utils /** * This is a helper class for Kafka test suites. This has the functionality to set up * and tear down local Kafka servers, and to push data using Kafka producers. + * + * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. */ private class KafkaTestUtils extends Logging { @@ -55,7 +58,7 @@ private class KafkaTestUtils extends Logging { private var zookeeper: EmbeddedZookeeper = _ - var zkClient: ZkClient = _ + private var zkClient: ZkClient = _ // Kafka broker related configurations private val brokerHost = "localhost" @@ -82,18 +85,25 @@ private class KafkaTestUtils extends Logging { s"$brokerHost:$brokerPort" } - /** Set up the Embedded Zookeeper server and get the proper Zookeeper port */ - def setupEmbeddedZookeeper(): Unit = { + def zookeeperClient: ZkClient = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") + Option(zkClient).getOrElse( + throw new IllegalStateException("Zookeeper client is not yet initialized")) + } + + // Set up the Embedded Zookeeper server and get the proper Zookeeper port + private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort + zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, + ZKStringSerializer) zkReady = true - zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) } - /** Set up the Embedded Kafka server */ - def setupEmbeddedKafkaServer(): Unit = { + // Set up the Embedded Kafka server + private def setupEmbeddedKafkaServer(): Unit = { assert(zkReady, "Zookeeper should be set up beforehand") // Kafka broker startup var bindSuccess: Boolean = false @@ -116,8 +126,14 @@ private class KafkaTestUtils extends Logging { brokerReady = true } + /** setup thw whole embedded servers, including Zookeeper and Kafka brokers */ + def setupEmbeddedServers(): Unit = { + setupEmbeddedZookeeper() + setupEmbeddedKafkaServer() + } + /** Tear down the whole servers, including Kafka broker and Zookeeper */ - def tearDownEmbeddedServers(): Unit = { + def teardownEmbeddedServers(): Unit = { brokerReady = false zkReady = false @@ -151,7 +167,7 @@ private class KafkaTestUtils extends Logging { waitUntilMetadataIsPropagated(topic, 0) } - /** Java function for sending messages to the Kafka broker */ + /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { import scala.collection.JavaConversions._ sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) @@ -191,6 +207,37 @@ private class KafkaTestUtils extends Logging { } private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test + // dependency + def eventually[T](timeout: Time, interval: Time)(func: => T): T = { + def makeAttempt(): Either[Throwable, T] = { + try { + Right(func) + } catch { + case e if NonFatal(e) => Left(e) + } + } + + val startTime = System.currentTimeMillis() + @tailrec + def tryAgain(attempt: Int): T = { + makeAttempt() match { + case Right(result) => result + case Left(e) => + val duration = System.currentTimeMillis() - startTime + if (duration < timeout.milliseconds) { + Thread.sleep(interval.milliseconds) + } else { + throw new TimeoutException(e.getMessage) + } + + tryAgain(attempt + 1) + } + } + + tryAgain(1) + } + eventually(Time(10000), Time(100)) { assert( server.apis.metadataCache.containsTopicAndPartition(topic, partition), @@ -199,38 +246,7 @@ private class KafkaTestUtils extends Logging { } } - // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test - // dependency - private def eventually[T](timeout: Time, interval: Time)(func: => T): T = { - def makeAttempt(): Either[Throwable, T] = { - try { - Right(func) - } catch { - case e: Throwable => Left(e) - } - } - - val startTime = System.currentTimeMillis() - @tailrec - def tryAgain(attempt: Int): T = { - makeAttempt() match { - case Right(result) => result - case Left(e) => - val duration = System.currentTimeMillis() - startTime - if (duration < timeout.milliseconds) { - Thread.sleep(interval.milliseconds) - } else { - throw new TimeoutException(e.getMessage) - } - - tryAgain(attempt + 1) - } - } - - tryAgain(1) - } - - class EmbeddedZookeeper(val zkConnect: String) { + private class EmbeddedZookeeper(val zkConnect: String) { val random = new Random() val snapshotDir = Utils.createTempDir() val logDir = Utils.createTempDir() diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index d617c4c6261d..1659dfd15e4d 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -46,8 +46,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -64,7 +63,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index c6b31f9760f7..32d5593c8aa3 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -42,8 +42,7 @@ public class JavaKafkaRDDSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -59,7 +58,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 4b46d9975eec..65318c95ec7c 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Random; -import scala.Predef; import scala.Tuple2; import kafka.serializer.StringDecoder; @@ -48,8 +47,7 @@ public class JavaKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedZookeeper(); - kafkaTestUtils.setupEmbeddedKafkaServer(); + kafkaTestUtils.setupEmbeddedServers(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -66,7 +64,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers(); + kafkaTestUtils.teardownEmbeddedServers(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index df5e15fa5e3d..4fe231ab42c6 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -54,13 +54,12 @@ class DirectKafkaStreamSuite override def beforeAll { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } override def afterAll { if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } @@ -88,7 +87,7 @@ class DirectKafkaStreamSuite } val totalSent = data.values.sum * topics.size val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "smallest" ) @@ -134,7 +133,7 @@ class DirectKafkaStreamSuite val data = Map("a" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -179,7 +178,7 @@ class DirectKafkaStreamSuite val data = Map("a" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "largest" ) val kc = new KafkaCluster(kafkaParams) @@ -225,7 +224,7 @@ class DirectKafkaStreamSuite testDir = Utils.createTempDir() val kafkaParams = Map( - "metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}", + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, "auto.offset.reset" -> "smallest" ) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index be668043faf5..1e9539a467db 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import scala.util.Random import kafka.common.TopicAndPartition -import org.scalatest.{FunSuite, BeforeAndAfterAll} +import org.scalatest.{BeforeAndAfterAll, FunSuite} class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { private val topic = "kcsuitetopic" + Random.nextInt(10000) @@ -31,17 +31,16 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() kafkaTestUtils.createTopic(topic) kafkaTestUtils.sendMessages(topic, Map("a" -> 1)) - kc = new KafkaCluster(Map("metadata.broker.list" -> s"${kafkaTestUtils.brokerAddress}")) + kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress)) } override def afterAll() { if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 419e12cb1b85..eb9bc2a1e08e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import org.scalatest.{FunSuite, BeforeAndAfterAll} +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.spark._ @@ -37,8 +37,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll { sc = new SparkContext(sparkConf) kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } override def afterAll { @@ -48,7 +47,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index dee6c4ebd253..3ec670f37650 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -23,31 +23,30 @@ import scala.language.postfixOps import scala.util.Random import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfter { +class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ private var kafkaTestUtils: KafkaTestUtils = _ - before { + override def beforeAll(): Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() } - after { + override def afterAll(): Unit = { if (ssc != null) { ssc.stop() ssc = null } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index e8c551ab9cb2..59000b0b26c7 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.kafka - import java.io.File import scala.collection.mutable @@ -28,7 +27,7 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.commons.io.FileUtils -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf @@ -36,7 +35,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils -class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventually { +class ReliableKafkaStreamSuite extends FunSuite + with BeforeAndAfterAll with BeforeAndAfter with Eventually { private val sparkConf = new SparkConf() .setMaster("local[4]") @@ -51,10 +51,9 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua private var ssc: StreamingContext = _ private var tempDirectory: File = null - before { + override def beforeAll() : Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedZookeeper() - kafkaTestUtils.setupEmbeddedKafkaServer() + kafkaTestUtils.setupEmbeddedServers() groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( @@ -62,31 +61,33 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) - - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - tempDirectory = Utils.createTempDir() - ssc.checkpoint(tempDirectory.getAbsolutePath) - } - - after { - if (ssc != null) { - ssc.stop() - } -<<<<<<< HEAD Utils.deleteRecursively(tempDirectory) tearDownKafka() -======= + } + override def afterAll(): Unit = { if (tempDirectory != null && tempDirectory.exists()) { FileUtils.deleteDirectory(tempDirectory) tempDirectory = null } if (kafkaTestUtils != null) { - kafkaTestUtils.tearDownEmbeddedServers() + kafkaTestUtils.teardownEmbeddedServers() kafkaTestUtils = null } ->>>>>>> Refactor the Kafka unit test and add Python Kafka unittest support + } + + before { + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + tempDirectory = Files.createTempDir() + ssc.checkpoint(tempDirectory.getAbsolutePath) + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } } @@ -146,9 +147,8 @@ class ReliableKafkaStreamSuite extends FunSuite with BeforeAndAfter with Eventua /** Getting partition offset from Zookeeper. */ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { - assert(kafkaTestUtils.zkClient != null, "Zookeeper client is not initialized") val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - ZkUtils.readDataMaybeNull(kafkaTestUtils.zkClient, zkPath)._1.map(_.toLong) + ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong) } } diff --git a/python/run-tests b/python/run-tests index a593d7b94e8b..9c099bed6092 100755 --- a/python/run-tests +++ b/python/run-tests @@ -37,9 +37,7 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" | tee -a $LOG_FILE - echo "Additional argument: $2" | tee -a $LOG_FILE - - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 $2 > $LOG_FILE 2>&1 + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -104,7 +102,7 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}" - for f in "${JAR_PATH}"/spark-streaming-kafka-assembly*.jar; do + for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 echo "You need to build Spark before running this program" 1>&2 @@ -113,8 +111,9 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_JAR="$f" done + export EXTRA_PYSPARK_SUBMIT_ARGS="-v --jars ${KAFKA_ASSEMBLY_JAR}" run_test "pyspark/streaming/util.py" - run_test "pyspark/streaming/tests.py" "--jars ${KAFKA_ASSEMBLY_JAR}" + run_test "pyspark/streaming/tests.py" } echo "Running PySpark tests. Output is in python/$LOG_FILE." From 0f1b7ceec3ebfb4a6b5170b07cd803992205e8ba Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Mar 2015 13:30:51 +0800 Subject: [PATCH 08/15] Still fix the bug --- python/pyspark/streaming/tests.py | 5 ++--- python/run-tests | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 641f9ef3e079..a9d07e350afe 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -567,12 +567,11 @@ def setUp(self): kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() - self._kafkaTestUtils.setupEmbeddedZookeeper() - self._kafkaTestUtils.setupEmbeddedKafkaServer() + self._kafkaTestUtils.setupEmbeddedServers() def tearDown(self): if self._kafkaTestUtils is not None: - self._kafkaTestUtils.tearDownEmbeddedServers() + self._kafkaTestUtils.teardownEmbeddedServers() self._kafkaTestUtils = None super(KafkaStreamTests, self).tearDown() diff --git a/python/run-tests b/python/run-tests index 9c099bed6092..f05da7ab173e 100755 --- a/python/run-tests +++ b/python/run-tests @@ -105,7 +105,8 @@ function run_streaming_tests() { for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 - echo "You need to build Spark before running this program" 1>&2 + echo "You need to build Spark with sbt streaming-kafka-assembly/assembly or mvn package" \ + "before running this program" 1>&2 exit 1 fi KAFKA_ASSEMBLY_JAR="$f" From 8a2f3e2e2ce3d475f3478158d315aa64574948fe Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 13 Mar 2015 17:02:33 +0800 Subject: [PATCH 09/15] Address the issues --- dev/run-tests | 2 +- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 8 ++++---- .../spark/streaming/kafka/JavaDirectKafkaStreamSuite.java | 4 ++-- .../apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 4 ++-- .../spark/streaming/kafka/JavaKafkaStreamSuite.java | 4 ++-- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 4 ++-- .../apache/spark/streaming/kafka/KafkaClusterSuite.scala | 4 ++-- .../org/apache/spark/streaming/kafka/KafkaRDDSuite.scala | 4 ++-- .../apache/spark/streaming/kafka/KafkaStreamSuite.scala | 5 ++--- .../spark/streaming/kafka/ReliableKafkaStreamSuite.scala | 4 ++-- python/pyspark/streaming/tests.py | 4 ++-- python/run-tests | 7 ++++--- 12 files changed, 27 insertions(+), 27 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 850cf4d795b8..bb21ab6c9aa0 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD build/mvn $HIVE_BUILD_ARGS clean package -DskipTests else echo -e "q\n" \ - | build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \ + | build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 7d67a72d5cc7..c8f9f9b6eb03 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -126,14 +126,14 @@ private class KafkaTestUtils extends Logging { brokerReady = true } - /** setup thw whole embedded servers, including Zookeeper and Kafka brokers */ - def setupEmbeddedServers(): Unit = { + /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ + def setup(): Unit = { setupEmbeddedZookeeper() setupEmbeddedKafkaServer() } - /** Tear down the whole servers, including Kafka broker and Zookeeper */ - def teardownEmbeddedServers(): Unit = { + /** Teardown the whole servers, including Kafka broker and Zookeeper */ + def teardown(): Unit = { brokerReady = false zkReady = false diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 1659dfd15e4d..ea6a7d2abb62 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -46,7 +46,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedServers(); + kafkaTestUtils.setup(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -63,7 +63,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers(); + kafkaTestUtils.teardown(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index 32d5593c8aa3..dbaf43e8cbc2 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -42,7 +42,7 @@ public class JavaKafkaRDDSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedServers(); + kafkaTestUtils.setup(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -58,7 +58,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers(); + kafkaTestUtils.teardown(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 65318c95ec7c..131bf36e7e89 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -47,7 +47,7 @@ public class JavaKafkaStreamSuite implements Serializable { @Before public void setUp() { kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setupEmbeddedServers(); + kafkaTestUtils.setup(); System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); @@ -64,7 +64,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers(); + kafkaTestUtils.teardown(); kafkaTestUtils = null; } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 4fe231ab42c6..b609e875ac29 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -54,12 +54,12 @@ class DirectKafkaStreamSuite override def beforeAll { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedServers() + kafkaTestUtils.setup() } override def afterAll { if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers() + kafkaTestUtils.teardown() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala index 1e9539a467db..2b33d2a220b2 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala @@ -31,7 +31,7 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll() { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedServers() + kafkaTestUtils.setup() kafkaTestUtils.createTopic(topic) kafkaTestUtils.sendMessages(topic, Map("a" -> 1)) @@ -40,7 +40,7 @@ class KafkaClusterSuite extends FunSuite with BeforeAndAfterAll { override def afterAll() { if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers() + kafkaTestUtils.teardown() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index eb9bc2a1e08e..7d26ce50875b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -37,7 +37,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll { sc = new SparkContext(sparkConf) kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedServers() + kafkaTestUtils.setup() } override def afterAll { @@ -47,7 +47,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers() + kafkaTestUtils.teardown() kafkaTestUtils = null } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 3ec670f37650..24699dfc33ad 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -36,7 +36,7 @@ class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll { override def beforeAll(): Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedServers() + kafkaTestUtils.setup() } override def afterAll(): Unit = { @@ -46,7 +46,7 @@ class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll { } if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers() + kafkaTestUtils.teardown() kafkaTestUtils = null } } @@ -84,4 +84,3 @@ class KafkaStreamSuite extends FunSuite with Eventually with BeforeAndAfterAll { } } } - diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 59000b0b26c7..4176af10b8b4 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -53,7 +53,7 @@ class ReliableKafkaStreamSuite extends FunSuite override def beforeAll() : Unit = { kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setupEmbeddedServers() + kafkaTestUtils.setup() groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( @@ -72,7 +72,7 @@ class ReliableKafkaStreamSuite extends FunSuite } if (kafkaTestUtils != null) { - kafkaTestUtils.teardownEmbeddedServers() + kafkaTestUtils.teardown() kafkaTestUtils = null } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a9d07e350afe..e386da93a4df 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -567,11 +567,11 @@ def setUp(self): kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() - self._kafkaTestUtils.setupEmbeddedServers() + self._kafkaTestUtils.setup() def tearDown(self): if self._kafkaTestUtils is not None: - self._kafkaTestUtils.teardownEmbeddedServers() + self._kafkaTestUtils.teardown() self._kafkaTestUtils = None super(KafkaStreamTests, self).tearDown() diff --git a/python/run-tests b/python/run-tests index f05da7ab173e..127205d417e8 100755 --- a/python/run-tests +++ b/python/run-tests @@ -105,14 +105,15 @@ function run_streaming_tests() { for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2 - echo "You need to build Spark with sbt streaming-kafka-assembly/assembly or mvn package" \ - "before running this program" 1>&2 + echo "You need to build Spark with " \ + "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or" \ + "'build/mvn package' before running this program" 1>&2 exit 1 fi KAFKA_ASSEMBLY_JAR="$f" done - export EXTRA_PYSPARK_SUBMIT_ARGS="-v --jars ${KAFKA_ASSEMBLY_JAR}" + export EXTRA_PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR}" run_test "pyspark/streaming/util.py" run_test "pyspark/streaming/tests.py" } From f889657a5fe65f66015cfcbb8b48185c16f7ec02 Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Tue, 17 Mar 2015 08:39:27 +0800 Subject: [PATCH 10/15] Update the code according --- python/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/run-tests b/python/run-tests index 127205d417e8..e683e1d95061 100755 --- a/python/run-tests +++ b/python/run-tests @@ -113,7 +113,7 @@ function run_streaming_tests() { KAFKA_ASSEMBLY_JAR="$f" done - export EXTRA_PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR}" + export PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR} pyspark-shell" run_test "pyspark/streaming/util.py" run_test "pyspark/streaming/tests.py" } From 40b47a3735eaf3c4e8681a0f9d520f9004918988 Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Tue, 17 Mar 2015 09:13:31 +0800 Subject: [PATCH 11/15] Style fix --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index b609e875ac29..415730f5559c 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder -import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import org.apache.spark.{Logging, SparkConf, SparkContext} From 0708bb1aa951f82c0fc0d39ec2a201bcb3bac921 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 25 Mar 2015 11:08:09 +0800 Subject: [PATCH 12/15] Fix rebase issue --- .../streaming/kafka/ReliableKafkaStreamSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 4176af10b8b4..38548dd73b82 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -26,7 +26,6 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} -import org.apache.commons.io.FileUtils import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually @@ -61,15 +60,12 @@ class ReliableKafkaStreamSuite extends FunSuite "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) - Utils.deleteRecursively(tempDirectory) - tearDownKafka() + + tempDirectory = Utils.createTempDir() } override def afterAll(): Unit = { - if (tempDirectory != null && tempDirectory.exists()) { - FileUtils.deleteDirectory(tempDirectory) - tempDirectory = null - } + Utils.deleteRecursively(tempDirectory) if (kafkaTestUtils != null) { kafkaTestUtils.teardown() @@ -79,7 +75,6 @@ class ReliableKafkaStreamSuite extends FunSuite before { ssc = new StreamingContext(sparkConf, Milliseconds(500)) - tempDirectory = Files.createTempDir() ssc.checkpoint(tempDirectory.getAbsolutePath) } @@ -90,7 +85,6 @@ class ReliableKafkaStreamSuite extends FunSuite } } - test("Reliable Kafka input stream with single topic") { val topic = "test-topic" kafkaTestUtils.createTopic(topic) From 92912d1f8949a027bd9912eeaec15244f4dd7322 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 8 Apr 2015 14:14:11 +0800 Subject: [PATCH 13/15] Address the commits --- .../streaming/kafka/KafkaTestUtils.scala | 87 ++++++++----------- python/pyspark/streaming/tests.py | 3 +- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index c8f9f9b6eb03..c07daf941f0b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -26,11 +26,9 @@ import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.language.postfixOps -import scala.util.Random import scala.util.control.NonFatal import kafka.admin.AdminUtils -import kafka.common.KafkaException import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} @@ -38,7 +36,7 @@ import kafka.utils.ZKStringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.I0Itec.zkclient.ZkClient -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.streaming.Time import org.apache.spark.util.Utils @@ -105,24 +103,16 @@ private class KafkaTestUtils extends Logging { // Set up the Embedded Kafka server private def setupEmbeddedKafkaServer(): Unit = { assert(zkReady, "Zookeeper should be set up beforehand") + // Kafka broker startup - var bindSuccess: Boolean = false - while(!bindSuccess) { - try { - brokerConf = new KafkaConfig(brokerConfigure) - server = new KafkaServer(brokerConf) - server.startup() - bindSuccess = true - } catch { - case e: KafkaException => - if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) { - brokerPort += 1 - } - case e: Exception => throw new Exception("Kafka server create failed", e) - } - } + Utils.startServiceOnPort(brokerPort, port => { + brokerPort = port + brokerConf = new KafkaConfig(brokerConfiguration) + server = new KafkaServer(brokerConf) + server.startup() + (server, port) + }, new SparkConf(), "KafkaBroker") - Thread.sleep(2000) brokerReady = true } @@ -181,13 +171,13 @@ private class KafkaTestUtils extends Logging { /** Send the array of messages to the Kafka broker */ def sendMessages(topic: String, messages: Array[String]): Unit = { - producer = new Producer[String, String](new ProducerConfig(producerConfigure)) + producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) producer.close() producer = null } - private def brokerConfigure: Properties = { + private def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") @@ -199,45 +189,45 @@ private class KafkaTestUtils extends Logging { props } - private def producerConfigure: Properties = { + private def producerConfiguration: Properties = { val props = new Properties() props.put("metadata.broker.list", brokerAddress) props.put("serializer.class", classOf[StringEncoder].getName) props } - private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { - // A simplified version of scalatest eventually, rewrite here is to avoid adding extra test + // A simplified version of scalatest eventually, rewritten here is to avoid adding extra test // dependency - def eventually[T](timeout: Time, interval: Time)(func: => T): T = { - def makeAttempt(): Either[Throwable, T] = { - try { - Right(func) - } catch { - case e if NonFatal(e) => Left(e) - } + def eventually[T](timeout: Time, interval: Time)(func: => T): T = { + def makeAttempt(): Either[Throwable, T] = { + try { + Right(func) + } catch { + case e if NonFatal(e) => Left(e) } + } - val startTime = System.currentTimeMillis() - @tailrec - def tryAgain(attempt: Int): T = { - makeAttempt() match { - case Right(result) => result - case Left(e) => - val duration = System.currentTimeMillis() - startTime - if (duration < timeout.milliseconds) { - Thread.sleep(interval.milliseconds) - } else { - throw new TimeoutException(e.getMessage) - } - - tryAgain(attempt + 1) - } - } + val startTime = System.currentTimeMillis() + @tailrec + def tryAgain(attempt: Int): T = { + makeAttempt() match { + case Right(result) => result + case Left(e) => + val duration = System.currentTimeMillis() - startTime + if (duration < timeout.milliseconds) { + Thread.sleep(interval.milliseconds) + } else { + throw new TimeoutException(e.getMessage) + } - tryAgain(1) + tryAgain(attempt + 1) + } } + tryAgain(1) + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { eventually(Time(10000), Time(100)) { assert( server.apis.metadataCache.containsTopicAndPartition(topic, partition), @@ -247,7 +237,6 @@ private class KafkaTestUtils extends Logging { } private class EmbeddedZookeeper(val zkConnect: String) { - val random = new Random() val snapshotDir = Utils.createTempDir() val logDir = Utils.createTempDir() diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index e386da93a4df..9b4635e49020 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -591,7 +591,8 @@ def test_kafka_stream(self): {"auto.offset.reset": "smallest"}) result = {} - for i in sum(self._collect(stream.map(lambda x: x[1]), 18), []): + for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]), + sum(sendData.values()))): result[i] = result.get(i, 0) + 1 self.assertEqual(sendData, result) From 82c756e80e1c960de264c172e719b6b5559f519c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 9 Apr 2015 17:16:59 +0800 Subject: [PATCH 14/15] Address the comments --- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 4 ++-- .../spark/streaming/kafka/JavaDirectKafkaStreamSuite.java | 3 --- .../org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 2 -- .../apache/spark/streaming/kafka/JavaKafkaStreamSuite.java | 3 --- 4 files changed, 2 insertions(+), 10 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index c07daf941f0b..13e947506597 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -196,8 +196,8 @@ private class KafkaTestUtils extends Logging { props } - // A simplified version of scalatest eventually, rewritten here is to avoid adding extra test - // dependency + // A simplified version of scalatest eventually, rewritten here to avoid adding extra test + // dependency def eventually[T](timeout: Time, interval: Time)(func: => T): T = { def makeAttempt(): Either[Throwable, T] = { try { diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index ea6a7d2abb62..4c1d6a03eb2b 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -47,7 +47,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable { public void setUp() { kafkaTestUtils = new KafkaTestUtils(); kafkaTestUtils.setup(); - System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); @@ -60,8 +59,6 @@ public void tearDown() { ssc = null; } - System.clearProperty("spark.driver.port"); - if (kafkaTestUtils != null) { kafkaTestUtils.teardown(); kafkaTestUtils = null; diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index dbaf43e8cbc2..a9dc6e50613c 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -43,7 +43,6 @@ public class JavaKafkaRDDSuite implements Serializable { public void setUp() { kafkaTestUtils = new KafkaTestUtils(); kafkaTestUtils.setup(); - System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); sc = new JavaSparkContext(sparkConf); @@ -55,7 +54,6 @@ public void tearDown() { sc.stop(); sc = null; } - System.clearProperty("spark.driver.port"); if (kafkaTestUtils != null) { kafkaTestUtils.teardown(); diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 131bf36e7e89..540f4ceabab4 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -48,7 +48,6 @@ public class JavaKafkaStreamSuite implements Serializable { public void setUp() { kafkaTestUtils = new KafkaTestUtils(); kafkaTestUtils.setup(); - System.clearProperty("spark.driver.port"); SparkConf sparkConf = new SparkConf() .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); ssc = new JavaStreamingContext(sparkConf, new Duration(500)); @@ -61,8 +60,6 @@ public void tearDown() { ssc = null; } - System.clearProperty("spark.driver.port"); - if (kafkaTestUtils != null) { kafkaTestUtils.teardown(); kafkaTestUtils = null; From ee4b9194f4e827dec8f25c33b28433e1348fe671 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 10 Apr 2015 11:09:47 +0800 Subject: [PATCH 15/15] Fixed newly merged issue --- python/run-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/run-tests b/python/run-tests index e683e1d95061..f3a07d8aba56 100755 --- a/python/run-tests +++ b/python/run-tests @@ -21,14 +21,14 @@ # Figure out where the Spark framework is installed FWDIR="$(cd "`dirname "$0"`"; cd ../; pwd)" +. "$FWDIR"/bin/load-spark-env.sh + # CD into the python directory to find things on the right path cd "$FWDIR/python" FAILED=0 LOG_FILE=unit-tests.log -. "$FWDIR"/bin/load-spark-env.sh - rm -f $LOG_FILE # Remove the metastore and warehouse directory created by the HiveContext tests in Spark SQL