From e768164fca1c93ec0a99f7020e301368f798156c Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Sun, 7 Dec 2014 10:50:44 -0500 Subject: [PATCH 01/18] #2808 update kafka to version 0.8.2 --- external/kafka/pom.xml | 2 +- .../streaming/kafka/KafkaStreamSuite.scala | 38 +++++++++++-------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index b3f44471cd32..f1e848070164 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.0 + 0.8.2-beta com.sun.jmx diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index b19c053ebfc4..ff7f5d5dd3e0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.{StringDecoder, StringEncoder} @@ -77,8 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin var bindSuccess: Boolean = false while(!bindSuccess) { try { - val brokerProps = getBrokerConfig() - brokerConf = new KafkaConfig(brokerProps) + brokerConf = new KafkaConfig(brokerConfig) server = new KafkaServer(brokerConf) logInfo("==================== 2 ====================") server.startup() @@ -123,27 +122,26 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { - val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { + (for ((s, freq) <- sent; i <- 0 until freq) yield { new KeyedMessage[String, String](topic, s) - } - messages.toSeq + }).toSeq } def createTopic(topic: String) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) logInfo("==================== 5 ====================") // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) + waitUntilMetadataIsPropagated(Seq(server), topic, 0) } def produceAndSendMessage(topic: String, sent: Map[String, Int]) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) + producer = new Producer[String, String](new ProducerConfig(producerConfig)) producer.send(createTestMessage(topic, sent): _*) producer.close() logInfo("==================== 6 ====================") } - private def getBrokerConfig(): Properties = { + private def brokerConfig: Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") @@ -155,7 +153,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def getProducerConfig(): Properties = { + private def producerConfig: Properties = { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port val props = new Properties() props.put("metadata.broker.list", brokerAddr) @@ -163,13 +161,21 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { + private def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int): Int = { + var leader: Int = -1 eventually(timeout(1000 milliseconds), interval(100 milliseconds)) { - assert( - server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)), - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) + assert(servers.foldLeft(true) { + (result, server) => + val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + partitionStateOpt match { + case None => false + case Some(partitionState) => + leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader + result && leader >= 0 // is valid broker id + } + }, s"Partition [$topic, $partition] metadata not propagated after timeout") } + leader } class EmbeddedZookeeper(val zkConnect: String) { From 2e67c66b174bd1641ea8986edde3ce5598add612 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Thu, 5 Feb 2015 08:08:19 -0500 Subject: [PATCH 02/18] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta. --- external/kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index f1e848070164..8e937e25a9f1 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.2-beta + 0.8.2.0 com.sun.jmx From 407382eae4f94489dd0ee1f4e84fcb0f6114f6e3 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 18 Mar 2015 09:13:17 -0500 Subject: [PATCH 03/18] [SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1 --- external/kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 84f0e02ac2fb..bf0de451c760 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.2.0 + 0.8.2.1 com.sun.jmx From ed02d2c71312138ec937cde047000e3a695f49c1 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 15 Apr 2015 11:47:28 -0500 Subject: [PATCH 04/18] [SPARK-2808][Streaming][Kafka] move default argument for api version to overloaded method, for binary compat --- .../spark/streaming/kafka/KafkaCluster.scala | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index d90eb66f6f55..5380902efe93 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -216,10 +216,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def defaultConsumerApiVersion: Short = 0 /** Requires Kafka >= 0.8.1.1 */ + def getConsumerOffsets( + groupId: String, + topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, Long]] = + getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion) + def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short = defaultConsumerApiVersion + versionId: Short ): Either[Err, Map[TopicAndPartition, Long]] = { getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r => r.map { kv => @@ -229,10 +235,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { } /** Requires Kafka >= 0.8.1.1 */ + def getConsumerOffsetMetadata( + groupId: String, + topicAndPartitions: Set[TopicAndPartition] + ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = + getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion) + def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short = defaultConsumerApiVersion + versionId: Short ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId) @@ -260,10 +272,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { } /** Requires Kafka >= 0.8.1.1 */ + def setConsumerOffsets( + groupId: String, + offsets: Map[TopicAndPartition, Long] + ): Either[Err, Map[TopicAndPartition, Short]] = + setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion) + def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long], - versionId: Short = defaultConsumerApiVersion + versionId: Short ): Either[Err, Map[TopicAndPartition, Short]] = { val meta = offsets.map { kv => kv._1 -> OffsetAndMetadata(kv._2) @@ -272,10 +290,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { } /** Requires Kafka >= 0.8.1.1 */ + def setConsumerOffsetMetadata( + groupId: String, + metadata: Map[TopicAndPartition, OffsetAndMetadata] + ): Either[Err, Map[TopicAndPartition, Short]] = + setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion) + def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], - versionId: Short = defaultConsumerApiVersion + versionId: Short ): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() val req = OffsetCommitRequest(groupId, metadata, versionId) From c70ee43bb66aeccb75145dcc8eb06f1e11e92859 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 28 Apr 2015 14:30:28 -0500 Subject: [PATCH 05/18] [SPARK-2808][Streaming][Kafka] add more asserts to test, try to figure out why it fails on jenkins but not locally --- .../spark/streaming/kafka/KafkaRDDSuite.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 3a29c9b296bc..3d1c8639d1cb 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -53,14 +53,14 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { } test("basic usage") { - val topic = "topicbasic" + val topic = s"topicbasic-${Random.nextInt}" kafkaTestUtils.createTopic(topic) val messages = Set("the", "quick", "brown", "fox") kafkaTestUtils.sendMessages(topic, messages.toArray) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -73,12 +73,12 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - val topic = "topic1" + val topic = s"topicboundary-${Random.nextInt}" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) kafkaTestUtils.createTopic(topic) val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + "group.id" -> s"test-consumer-${Random.nextInt}") val kc = new KafkaCluster(kafkaParams) @@ -88,12 +88,17 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val rdd = getRdd(kc, Set(topic)) assert(rdd.isDefined) - assert(rdd.get.count === sent.values.sum, "didn't get all sent messages") - val ranges = rdd.get.asInstanceOf[HasOffsetRanges] - .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges + val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum + val sentCount = sent.values.sum - kc.setConsumerOffsets(kafkaParams("group.id"), ranges).fold( + assert(rangeCount === sentCount, "offset range didn't include all sent messages") + assert(rdd.get.count === sentCount, "didn't get all sent messages") + + val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + + kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold( err => throw new Exception(err.mkString("\n")), _ => () ) From 9edab4c10553bb054ab8e2b542a6be71195c4029 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 28 Apr 2015 16:55:40 -0500 Subject: [PATCH 06/18] [SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins failing test --- .../apache/spark/streaming/kafka/KafkaTestUtils.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 45a9017d7946..933ed2867040 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -29,10 +29,11 @@ import scala.language.postfixOps import scala.util.control.NonFatal import kafka.admin.AdminUtils +import kafka.api.Request import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.ZKStringSerializer +import kafka.utils.{ZKStringSerializer, ZkUtils} import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.I0Itec.zkclient.ZkClient @@ -232,8 +233,11 @@ private class KafkaTestUtils extends Logging { assert( server.apis.metadataCache.getPartitionInfo(topic, partition) match { case Some(partitionState) => - // is valid broker id - partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader >= 0 + val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && + Request.isValidBrokerId(leaderAndIsr.leader) && + leaderAndIsr.isr.size >= 1 + case _ => false }, From af6f3ecd36a07d93baebe73206310428d712decb Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 28 Apr 2015 20:50:11 -0500 Subject: [PATCH 07/18] [SPARK-2808][Streaming][Kafka] delay test until latest leader offset matches expected value --- .../spark/streaming/kafka/KafkaTestUtils.scala | 15 +++++++++++++++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 ++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 933ed2867040..58774ee506eb 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -30,6 +30,7 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request +import kafka.common.TopicAndPartition import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} @@ -228,6 +229,20 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } + def waitUntilLeaderOffset( + kc: KafkaCluster, + topic: String, + partition: Int, + offset: Long): Unit = { + eventually(Time(10000), Time(100)) { + val tp = TopicAndPartition(topic, partition) + val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset + assert( + llo == offset, + s"$topic $partition $offset not reached after timeout") + } + } + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { eventually(Time(10000), Time(100)) { assert( diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 3d1c8639d1cb..ec800177e905 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -58,10 +58,12 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val messages = Set("the", "quick", "brown", "fox") kafkaTestUtils.sendMessages(topic, messages.toArray) - val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt}") + val kc = new KafkaCluster(kafkaParams) + kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, messages.size) + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( @@ -84,6 +86,9 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) + val sentCount = sent.values.sum + kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount) + // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) @@ -91,7 +96,6 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum - val sentCount = sent.values.sum assert(rangeCount === sentCount, "offset range didn't include all sent messages") assert(rdd.get.count === sentCount, "didn't get all sent messages") From 61b3464cada26c10ee42271e5a958bc85e8c3cfb Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 28 Apr 2015 21:43:54 -0500 Subject: [PATCH 08/18] [SPARK-2808][Streaming][Kafka] delay for second send in boundary condition test --- .../scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index ec800177e905..996da660b927 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -113,6 +113,8 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val sentOnlyOne = Map("d" -> 1) kafkaTestUtils.sendMessages(topic, sentOnlyOne) + kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount + 1) + assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") From 3824ce390721c32610e6da90a5c08d4f4b68ad32 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 29 Apr 2015 09:39:09 -0500 Subject: [PATCH 09/18] [SPARK-2808][Streaming][Kafka] naming / comments per tdas --- .../spark/streaming/kafka/KafkaCluster.scala | 16 ++++++++-------- .../spark/streaming/kafka/KafkaTestUtils.scala | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index f67924f8863a..194fcafd6bd9 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -233,9 +233,9 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Long]] = { - getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r => + getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => r.map { kv => kv._1 -> kv._2.offset } @@ -252,10 +252,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() - val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId) + val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.fetchOffsets(req) @@ -289,12 +289,12 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { val meta = offsets.map { kv => kv._1 -> OffsetAndMetadata(kv._2) } - setConsumerOffsetMetadata(groupId, meta, versionId) + setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } /** Requires Kafka >= 0.8.1.1 */ @@ -307,10 +307,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() - val req = OffsetCommitRequest(groupId, metadata, versionId) + val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 58774ee506eb..104f772fd76b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -229,6 +229,7 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } + /** wait until the leader offset for the given topic / partition equals the specified offset */ def waitUntilLeaderOffset( kc: KafkaCluster, topic: String, From 2b92d3f919a045d20965ddc6b02465a7e5b2c64d Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 29 Apr 2015 11:47:04 -0500 Subject: [PATCH 10/18] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well --- .../org/apache/spark/streaming/kafka/KafkaCluster.scala | 6 ++++++ .../org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index 194fcafd6bd9..5425d338311c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka import scala.util.control.NonFatal import scala.util.Random import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import java.util.Properties import kafka.api._ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} @@ -37,6 +38,11 @@ private[spark] class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig} + /** Constructor that takes a Java map */ + def this(kafkaParams: java.util.Map[String, String]) { + this(kafkaParams.asScala.toMap) + } + // ConsumerConfig isn't serializable @transient private var _config: SimpleConsumerConfig = null diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e50613c..d0b137a11b1d 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,6 +72,10 @@ public void testKafkaRDD() throws InterruptedException { HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + KafkaCluster kc = new KafkaCluster(kafkaParams); + kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length); + OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), OffsetRange.create(topic2, 0, 0, 1) From 2712649dfa1578d7b744244b6cb87206a441b1bb Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 29 Apr 2015 15:42:40 -0500 Subject: [PATCH 11/18] [SPARK-2808][Streaming][Kafka] add more logging to python test, see why its timing out in jenkins --- python/pyspark/streaming/tests.py | 8 ++++++-- python/run-tests | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 5fa1e5ef081a..9582a4c2de84 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -592,21 +592,25 @@ def tearDown(self): def test_kafka_stream(self): """Test the Python Kafka stream API.""" + print >> sys.stderr, "test_kafka_stream started" topic = "topic1" sendData = {"a": 3, "b": 5, "c": 10} self._kafkaTestUtils.createTopic(topic) + print >> sys.stderr, "test_kafka_stream created topic" self._kafkaTestUtils.sendMessages(topic, sendData) + print >> sys.stderr, "test_kafka_stream sent messages" + time.sleep(1) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, {"auto.offset.reset": "smallest"}) - + print >> sys.stderr, "test_kafka_stream created stream" result = {} for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]), sum(sendData.values()))): result[i] = result.get(i, 0) + 1 - + print >> sys.stderr, "test_kafka_stream got results" self.assertEqual(sendData, result) if __name__ == "__main__": diff --git a/python/run-tests b/python/run-tests index 88b63b84fdc2..624c2792e14e 100755 --- a/python/run-tests +++ b/python/run-tests @@ -38,7 +38,7 @@ rm -rf metastore warehouse function run_test() { echo -en "Running test: $1 ... " | tee -a $LOG_FILE start=$(date +"%s") - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE FAILED=$((PIPESTATUS[0]||$FAILED)) From 4c4557f149e0d96b2f5ea8020079cd3159388dec Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Apr 2015 13:00:40 -0500 Subject: [PATCH 12/18] [SPARK-2808][Streaming][Kafka] add even more logging to python test --- python/pyspark/streaming/tests.py | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f9c5268f2118..87d1fc704945 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -142,6 +142,7 @@ class BasicOperationTests(PySparkStreamingTestCase): def test_map(self): """Basic operation test for DStream.map.""" + print >> sys.stderr, "test_map started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -151,6 +152,7 @@ def func(dstream): def test_flatMap(self): """Basic operation test for DStream.faltMap.""" + print >> sys.stderr, "test_flatMap started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -161,6 +163,7 @@ def func(dstream): def test_filter(self): """Basic operation test for DStream.filter.""" + print >> sys.stderr, "test_filter started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -170,6 +173,7 @@ def func(dstream): def test_count(self): """Basic operation test for DStream.count.""" + print >> sys.stderr, "test_count started" input = [range(5), range(10), range(20)] def func(dstream): @@ -179,6 +183,7 @@ def func(dstream): def test_reduce(self): """Basic operation test for DStream.reduce.""" + print >> sys.stderr, "test_reduce started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -188,6 +193,7 @@ def func(dstream): def test_reduceByKey(self): """Basic operation test for DStream.reduceByKey.""" + print >> sys.stderr, "test_reduceByKey started" input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)], [("", 1), ("", 1), ("", 1), ("", 1)], [(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]] @@ -199,6 +205,7 @@ def func(dstream): def test_mapValues(self): """Basic operation test for DStream.mapValues.""" + print >> sys.stderr, "test_mapValues started" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [(0, 4), (1, 1), (2, 2), (3, 3)], [(1, 1), (2, 1), (3, 1), (4, 1)]] @@ -212,6 +219,7 @@ def func(dstream): def test_flatMapValues(self): """Basic operation test for DStream.flatMapValues.""" + print >> sys.stderr, "test_flatMapValues started" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [(0, 4), (1, 1), (2, 1), (3, 1)], [(1, 1), (2, 1), (3, 1), (4, 1)]] @@ -226,6 +234,7 @@ def func(dstream): def test_glom(self): """Basic operation test for DStream.glom.""" + print >> sys.stderr, "test_glom started" input = [range(1, 5), range(5, 9), range(9, 13)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -236,6 +245,7 @@ def func(dstream): def test_mapPartitions(self): """Basic operation test for DStream.mapPartitions.""" + print >> sys.stderr, "test_mapPartitions started" input = [range(1, 5), range(5, 9), range(9, 13)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -248,6 +258,7 @@ def f(iterator): def test_countByValue(self): """Basic operation test for DStream.countByValue.""" + print >> sys.stderr, "test_countByValue started" input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]] def func(dstream): @@ -257,6 +268,7 @@ def func(dstream): def test_groupByKey(self): """Basic operation test for DStream.groupByKey.""" + print >> sys.stderr, "test_groupByKey started" input = [[(1, 1), (2, 1), (3, 1), (4, 1)], [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] @@ -271,6 +283,7 @@ def func(dstream): def test_combineByKey(self): """Basic operation test for DStream.combineByKey.""" + print >> sys.stderr, "test_combineByKey started" input = [[(1, 1), (2, 1), (3, 1), (4, 1)], [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] @@ -285,6 +298,7 @@ def add(a, b): self._test_func(input, func, expected, sort=True) def test_repartition(self): + print >> sys.stderr, "test_repartition started" input = [range(1, 5), range(5, 9)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -294,6 +308,7 @@ def func(dstream): self._test_func(rdds, func, expected) def test_union(self): + print >> sys.stderr, "test_union started" input1 = [range(3), range(5), range(6)] input2 = [range(3, 6), range(5, 6)] @@ -304,6 +319,7 @@ def func(d1, d2): self._test_func(input1, func, expected, input2=input2) def test_cogroup(self): + print >> sys.stderr, "test_cogroup started" input = [[(1, 1), (2, 1), (3, 1)], [(1, 1), (1, 1), (1, 1), (2, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1)]] @@ -320,6 +336,7 @@ def func(d1, d2): self._test_func(input, func, expected, sort=True, input2=input2) def test_join(self): + print >> sys.stderr, "test_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -330,6 +347,7 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_left_outer_join(self): + print >> sys.stderr, "test_left_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -340,6 +358,7 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_right_outer_join(self): + print >> sys.stderr, "test_right_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -350,6 +369,7 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_full_outer_join(self): + print >> sys.stderr, "test_full_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -360,6 +380,7 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_update_state_by_key(self): + print >> sys.stderr, "test_update_state_by_key started" def updater(vs, s): if not s: @@ -382,6 +403,7 @@ class WindowFunctionTests(PySparkStreamingTestCase): timeout = 5 def test_window(self): + print >> sys.stderr, "test_window started" input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): @@ -391,6 +413,7 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_window(self): + print >> sys.stderr, "test_count_by_window started" input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): @@ -400,6 +423,7 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_window_large(self): + print >> sys.stderr, "test_count_by_window_large started" input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): @@ -409,6 +433,7 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_value_and_window(self): + print >> sys.stderr, "test_count_by_value_and_window started" input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): @@ -418,6 +443,7 @@ def func(dstream): self._test_func(input, func, expected) def test_group_by_key_and_window(self): + print >> sys.stderr, "test_group_by_key_and_window started" input = [[('a', i)] for i in range(5)] def func(dstream): @@ -428,6 +454,7 @@ def func(dstream): self._test_func(input, func, expected) def test_reduce_by_invalid_window(self): + print >> sys.stderr, "test_reduce_by_invalid_window started" input1 = [range(3), range(5), range(1), range(6)] d1 = self.ssc.queueStream(input1) self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1)) @@ -444,24 +471,28 @@ def _add_input_stream(self): self._collect(stream, 1, block=False) def test_stop_only_streaming_context(self): + print >> sys.stderr, "test_stop_only_streaming_context started" self._add_input_stream() self.ssc.start() self.ssc.stop(False) self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5) def test_stop_multiple_times(self): + print >> sys.stderr, "test_stop_multiple_times started" self._add_input_stream() self.ssc.start() self.ssc.stop(False) self.ssc.stop(False) def test_queue_stream(self): + print >> sys.stderr, "test_queue_stream started" input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) result = self._collect(dstream, 3) self.assertEqual(input, result) def test_text_file_stream(self): + print >> sys.stderr, "test_text_file_stream started" d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) dstream2 = self.ssc.textFileStream(d).map(int) @@ -475,6 +506,7 @@ def test_text_file_stream(self): self.assertEqual([list(range(10)), list(range(10))], result) def test_binary_records_stream(self): + print >> sys.stderr, "test_binary_records_stream started" d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) dstream = self.ssc.binaryRecordsStream(d, 10).map( @@ -489,6 +521,7 @@ def test_binary_records_stream(self): self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result]) def test_union(self): + print >> sys.stderr, "test_union started" input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) dstream2 = self.ssc.queueStream(input) @@ -498,6 +531,7 @@ def test_union(self): self.assertEqual(expected, result) def test_transform(self): + print >> sys.stderr, "test_transform started" dstream1 = self.ssc.queueStream([[1]]) dstream2 = self.ssc.queueStream([[2]]) dstream3 = self.ssc.queueStream([[3]]) @@ -514,6 +548,7 @@ def func(rdds): class CheckpointTests(unittest.TestCase): def test_get_or_create(self): + print >> sys.stderr, "test_get_or_create started" inputd = tempfile.mkdtemp() outputd = tempfile.mkdtemp() + "/" @@ -577,6 +612,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): duration = 1 def setUp(self): + print >> sys.stderr, "KafkaStreamTests setUp started" super(KafkaStreamTests, self).setUp() kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ @@ -629,6 +665,7 @@ def test_kafka_stream(self): def test_kafka_direct_stream(self): """Test the Python direct Kafka stream API.""" + print >> sys.stderr, "test_kafka_direct_stream started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), @@ -643,6 +680,7 @@ def test_kafka_direct_stream(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_from_offset(self): """Test the Python direct Kafka stream API with start offset specified.""" + print >> sys.stderr, "test_kafka_direct_stream_from_offset started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} fromOffsets = {TopicAndPartition(topic, 0): long(0)} @@ -657,6 +695,7 @@ def test_kafka_direct_stream_from_offset(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd(self): """Test the Python direct Kafka RDD API.""" + print >> sys.stderr, "test_kafka_rdd started" topic = self._randomTopic() sendData = {"a": 1, "b": 2} offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] @@ -671,6 +710,7 @@ def test_kafka_rdd(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_with_leaders(self): """Test the Python direct Kafka RDD API with leaders.""" + print >> sys.stderr, "test_kafka_rdd_with_leaders started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] From 1d896e2a262569089b80b5cc018b3dff8121368e Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 30 Apr 2015 15:11:40 -0500 Subject: [PATCH 13/18] [SPARK-2808][Streaming][Kafka] add even even more logging to python test --- python/pyspark/streaming/tests.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 87d1fc704945..db7e70947250 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -612,7 +612,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): duration = 1 def setUp(self): - print >> sys.stderr, "KafkaStreamTests setUp started" super(KafkaStreamTests, self).setUp() kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ @@ -640,9 +639,9 @@ def _validateStreamResult(self, sendData, stream): def _validateRddResult(self, sendData, rdd): result = {} + print >> sys.stderr, "_validateRddResult started" for i in rdd.map(lambda x: x[1]).collect(): result[i] = result.get(i, 0) + 1 - self.assertEqual(sendData, result) def test_kafka_stream(self): @@ -655,7 +654,7 @@ def test_kafka_stream(self): print >> sys.stderr, "test_kafka_stream created topic" self._kafkaTestUtils.sendMessages(topic, sendData) print >> sys.stderr, "test_kafka_stream sent messages" - time.sleep(1) + time.sleep(5) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, @@ -702,8 +701,10 @@ def test_kafka_rdd(self): kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} self._kafkaTestUtils.createTopic(topic) + print >> sys.stderr, "test_kafka_rdd created topic" self._kafkaTestUtils.sendMessages(topic, sendData) - + print >> sys.stderr, "test_kafka_rdd sent data" + time.sleep(5) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -719,9 +720,12 @@ def test_kafka_rdd_with_leaders(self): leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))} self._kafkaTestUtils.createTopic(topic) + print >> sys.stderr, "test_kafka_rdd_with_leaders created topic" self._kafkaTestUtils.sendMessages(topic, sendData) - + print >> sys.stderr, "test_kafka_rdd_with_leaders sent data" + time.sleep(5) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) + print >> sys.stderr, "test_kafka_rdd_with_leaders created rdd" self._validateRddResult(sendData, rdd) if __name__ == "__main__": From 30d991da695b700352e0250995367238b5127424 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 May 2015 08:55:29 -0500 Subject: [PATCH 14/18] [SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks python 3 syntax --- python/pyspark/streaming/tests.py | 49 ------------------------------- python/run-tests | 2 +- 2 files changed, 1 insertion(+), 50 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index db7e70947250..ee1ede11cf24 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -142,7 +142,6 @@ class BasicOperationTests(PySparkStreamingTestCase): def test_map(self): """Basic operation test for DStream.map.""" - print >> sys.stderr, "test_map started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -152,7 +151,6 @@ def func(dstream): def test_flatMap(self): """Basic operation test for DStream.faltMap.""" - print >> sys.stderr, "test_flatMap started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -163,7 +161,6 @@ def func(dstream): def test_filter(self): """Basic operation test for DStream.filter.""" - print >> sys.stderr, "test_filter started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -173,7 +170,6 @@ def func(dstream): def test_count(self): """Basic operation test for DStream.count.""" - print >> sys.stderr, "test_count started" input = [range(5), range(10), range(20)] def func(dstream): @@ -183,7 +179,6 @@ def func(dstream): def test_reduce(self): """Basic operation test for DStream.reduce.""" - print >> sys.stderr, "test_reduce started" input = [range(1, 5), range(5, 9), range(9, 13)] def func(dstream): @@ -193,7 +188,6 @@ def func(dstream): def test_reduceByKey(self): """Basic operation test for DStream.reduceByKey.""" - print >> sys.stderr, "test_reduceByKey started" input = [[("a", 1), ("a", 1), ("b", 1), ("b", 1)], [("", 1), ("", 1), ("", 1), ("", 1)], [(1, 1), (1, 1), (2, 1), (2, 1), (3, 1)]] @@ -205,7 +199,6 @@ def func(dstream): def test_mapValues(self): """Basic operation test for DStream.mapValues.""" - print >> sys.stderr, "test_mapValues started" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [(0, 4), (1, 1), (2, 2), (3, 3)], [(1, 1), (2, 1), (3, 1), (4, 1)]] @@ -219,7 +212,6 @@ def func(dstream): def test_flatMapValues(self): """Basic operation test for DStream.flatMapValues.""" - print >> sys.stderr, "test_flatMapValues started" input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)], [(0, 4), (1, 1), (2, 1), (3, 1)], [(1, 1), (2, 1), (3, 1), (4, 1)]] @@ -234,7 +226,6 @@ def func(dstream): def test_glom(self): """Basic operation test for DStream.glom.""" - print >> sys.stderr, "test_glom started" input = [range(1, 5), range(5, 9), range(9, 13)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -245,7 +236,6 @@ def func(dstream): def test_mapPartitions(self): """Basic operation test for DStream.mapPartitions.""" - print >> sys.stderr, "test_mapPartitions started" input = [range(1, 5), range(5, 9), range(9, 13)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -258,7 +248,6 @@ def f(iterator): def test_countByValue(self): """Basic operation test for DStream.countByValue.""" - print >> sys.stderr, "test_countByValue started" input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]] def func(dstream): @@ -268,7 +257,6 @@ def func(dstream): def test_groupByKey(self): """Basic operation test for DStream.groupByKey.""" - print >> sys.stderr, "test_groupByKey started" input = [[(1, 1), (2, 1), (3, 1), (4, 1)], [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] @@ -283,7 +271,6 @@ def func(dstream): def test_combineByKey(self): """Basic operation test for DStream.combineByKey.""" - print >> sys.stderr, "test_combineByKey started" input = [[(1, 1), (2, 1), (3, 1), (4, 1)], [(1, 1), (1, 1), (1, 1), (2, 1), (2, 1), (3, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1), ("", 1)]] @@ -298,7 +285,6 @@ def add(a, b): self._test_func(input, func, expected, sort=True) def test_repartition(self): - print >> sys.stderr, "test_repartition started" input = [range(1, 5), range(5, 9)] rdds = [self.sc.parallelize(r, 2) for r in input] @@ -308,7 +294,6 @@ def func(dstream): self._test_func(rdds, func, expected) def test_union(self): - print >> sys.stderr, "test_union started" input1 = [range(3), range(5), range(6)] input2 = [range(3, 6), range(5, 6)] @@ -319,7 +304,6 @@ def func(d1, d2): self._test_func(input1, func, expected, input2=input2) def test_cogroup(self): - print >> sys.stderr, "test_cogroup started" input = [[(1, 1), (2, 1), (3, 1)], [(1, 1), (1, 1), (1, 1), (2, 1)], [("a", 1), ("a", 1), ("b", 1), ("", 1), ("", 1)]] @@ -336,7 +320,6 @@ def func(d1, d2): self._test_func(input, func, expected, sort=True, input2=input2) def test_join(self): - print >> sys.stderr, "test_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -347,7 +330,6 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_left_outer_join(self): - print >> sys.stderr, "test_left_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -358,7 +340,6 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_right_outer_join(self): - print >> sys.stderr, "test_right_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -369,7 +350,6 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_full_outer_join(self): - print >> sys.stderr, "test_full_outer_join started" input = [[('a', 1), ('b', 2)]] input2 = [[('b', 3), ('c', 4)]] @@ -380,7 +360,6 @@ def func(a, b): self._test_func(input, func, expected, True, input2) def test_update_state_by_key(self): - print >> sys.stderr, "test_update_state_by_key started" def updater(vs, s): if not s: @@ -403,7 +382,6 @@ class WindowFunctionTests(PySparkStreamingTestCase): timeout = 5 def test_window(self): - print >> sys.stderr, "test_window started" input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): @@ -413,7 +391,6 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_window(self): - print >> sys.stderr, "test_count_by_window started" input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): @@ -423,7 +400,6 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_window_large(self): - print >> sys.stderr, "test_count_by_window_large started" input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): @@ -433,7 +409,6 @@ def func(dstream): self._test_func(input, func, expected) def test_count_by_value_and_window(self): - print >> sys.stderr, "test_count_by_value_and_window started" input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): @@ -443,7 +418,6 @@ def func(dstream): self._test_func(input, func, expected) def test_group_by_key_and_window(self): - print >> sys.stderr, "test_group_by_key_and_window started" input = [[('a', i)] for i in range(5)] def func(dstream): @@ -454,7 +428,6 @@ def func(dstream): self._test_func(input, func, expected) def test_reduce_by_invalid_window(self): - print >> sys.stderr, "test_reduce_by_invalid_window started" input1 = [range(3), range(5), range(1), range(6)] d1 = self.ssc.queueStream(input1) self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1)) @@ -471,28 +444,24 @@ def _add_input_stream(self): self._collect(stream, 1, block=False) def test_stop_only_streaming_context(self): - print >> sys.stderr, "test_stop_only_streaming_context started" self._add_input_stream() self.ssc.start() self.ssc.stop(False) self.assertEqual(len(self.sc.parallelize(range(5), 5).glom().collect()), 5) def test_stop_multiple_times(self): - print >> sys.stderr, "test_stop_multiple_times started" self._add_input_stream() self.ssc.start() self.ssc.stop(False) self.ssc.stop(False) def test_queue_stream(self): - print >> sys.stderr, "test_queue_stream started" input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) result = self._collect(dstream, 3) self.assertEqual(input, result) def test_text_file_stream(self): - print >> sys.stderr, "test_text_file_stream started" d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) dstream2 = self.ssc.textFileStream(d).map(int) @@ -506,7 +475,6 @@ def test_text_file_stream(self): self.assertEqual([list(range(10)), list(range(10))], result) def test_binary_records_stream(self): - print >> sys.stderr, "test_binary_records_stream started" d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) dstream = self.ssc.binaryRecordsStream(d, 10).map( @@ -521,7 +489,6 @@ def test_binary_records_stream(self): self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result]) def test_union(self): - print >> sys.stderr, "test_union started" input = [list(range(i + 1)) for i in range(3)] dstream = self.ssc.queueStream(input) dstream2 = self.ssc.queueStream(input) @@ -531,7 +498,6 @@ def test_union(self): self.assertEqual(expected, result) def test_transform(self): - print >> sys.stderr, "test_transform started" dstream1 = self.ssc.queueStream([[1]]) dstream2 = self.ssc.queueStream([[2]]) dstream3 = self.ssc.queueStream([[3]]) @@ -548,7 +514,6 @@ def func(rdds): class CheckpointTests(unittest.TestCase): def test_get_or_create(self): - print >> sys.stderr, "test_get_or_create started" inputd = tempfile.mkdtemp() outputd = tempfile.mkdtemp() + "/" @@ -639,32 +604,26 @@ def _validateStreamResult(self, sendData, stream): def _validateRddResult(self, sendData, rdd): result = {} - print >> sys.stderr, "_validateRddResult started" for i in rdd.map(lambda x: x[1]).collect(): result[i] = result.get(i, 0) + 1 self.assertEqual(sendData, result) def test_kafka_stream(self): """Test the Python Kafka stream API.""" - print >> sys.stderr, "test_kafka_stream started" topic = self._randomTopic() sendData = {"a": 3, "b": 5, "c": 10} self._kafkaTestUtils.createTopic(topic) - print >> sys.stderr, "test_kafka_stream created topic" self._kafkaTestUtils.sendMessages(topic, sendData) - print >> sys.stderr, "test_kafka_stream sent messages" time.sleep(5) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, {"auto.offset.reset": "smallest"}) - print >> sys.stderr, "test_kafka_stream created stream" self._validateStreamResult(sendData, stream) def test_kafka_direct_stream(self): """Test the Python direct Kafka stream API.""" - print >> sys.stderr, "test_kafka_direct_stream started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), @@ -679,7 +638,6 @@ def test_kafka_direct_stream(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_direct_stream_from_offset(self): """Test the Python direct Kafka stream API with start offset specified.""" - print >> sys.stderr, "test_kafka_direct_stream_from_offset started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} fromOffsets = {TopicAndPartition(topic, 0): long(0)} @@ -694,16 +652,13 @@ def test_kafka_direct_stream_from_offset(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd(self): """Test the Python direct Kafka RDD API.""" - print >> sys.stderr, "test_kafka_rdd started" topic = self._randomTopic() sendData = {"a": 1, "b": 2} offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} self._kafkaTestUtils.createTopic(topic) - print >> sys.stderr, "test_kafka_rdd created topic" self._kafkaTestUtils.sendMessages(topic, sendData) - print >> sys.stderr, "test_kafka_rdd sent data" time.sleep(5) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -711,7 +666,6 @@ def test_kafka_rdd(self): @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_with_leaders(self): """Test the Python direct Kafka RDD API with leaders.""" - print >> sys.stderr, "test_kafka_rdd_with_leaders started" topic = self._randomTopic() sendData = {"a": 1, "b": 2, "c": 3} offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] @@ -720,12 +674,9 @@ def test_kafka_rdd_with_leaders(self): leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))} self._kafkaTestUtils.createTopic(topic) - print >> sys.stderr, "test_kafka_rdd_with_leaders created topic" self._kafkaTestUtils.sendMessages(topic, sendData) - print >> sys.stderr, "test_kafka_rdd_with_leaders sent data" time.sleep(5) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) - print >> sys.stderr, "test_kafka_rdd_with_leaders created rdd" self._validateRddResult(sendData, rdd) if __name__ == "__main__": diff --git a/python/run-tests b/python/run-tests index 624c2792e14e..32692c8b8834 100755 --- a/python/run-tests +++ b/python/run-tests @@ -36,7 +36,7 @@ rm -f $LOG_FILE rm -rf metastore warehouse function run_test() { - echo -en "Running test: $1 ... " | tee -a $LOG_FILE + echo -en "Running test: $1 ... " | tee -a $LOG_FILE 2>&1 start=$(date +"%s") SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE From d4267e95ab7ed8a6e580d2123ba6ef25dbc392e2 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 May 2015 09:12:32 -0500 Subject: [PATCH 15/18] [SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script --- python/run-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/run-tests b/python/run-tests index 32692c8b8834..88b63b84fdc2 100755 --- a/python/run-tests +++ b/python/run-tests @@ -36,9 +36,9 @@ rm -f $LOG_FILE rm -rf metastore warehouse function run_test() { - echo -en "Running test: $1 ... " | tee -a $LOG_FILE 2>&1 + echo -en "Running test: $1 ... " | tee -a $LOG_FILE start=$(date +"%s") - SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) From 1770abc64eaa8123301345f68e66aa2647bf2033 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 May 2015 14:15:09 -0500 Subject: [PATCH 16/18] [SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to call, call it from python tests as well --- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 2 +- .../apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 5 ++--- .../org/apache/spark/streaming/kafka/KafkaRDDSuite.scala | 7 +++---- python/pyspark/streaming/tests.py | 6 +++--- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 104f772fd76b..b11a73317163 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -231,11 +231,11 @@ private class KafkaTestUtils extends Logging { /** wait until the leader offset for the given topic / partition equals the specified offset */ def waitUntilLeaderOffset( - kc: KafkaCluster, topic: String, partition: Int, offset: Long): Unit = { eventually(Time(10000), Time(100)) { + val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress)) val tp = TopicAndPartition(topic, partition) val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset assert( diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index d0b137a11b1d..5cf379635354 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,9 +72,8 @@ public void testKafkaRDD() throws InterruptedException { HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - KafkaCluster kc = new KafkaCluster(kafkaParams); - kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length); - kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length); OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 996da660b927..fbaed25ac217 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -61,8 +61,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt}") - val kc = new KafkaCluster(kafkaParams) - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, messages.size) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size) val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -87,7 +86,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) val sentCount = sent.values.sum - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount) // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) @@ -113,7 +112,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val sentOnlyOne = Map("d" -> 1) kafkaTestUtils.sendMessages(topic, sentOnlyOne) - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount + 1) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1) assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ee1ede11cf24..5f18e1dbb922 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -615,7 +615,7 @@ def test_kafka_stream(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, @@ -659,7 +659,7 @@ def test_kafka_rdd(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -675,7 +675,7 @@ def test_kafka_rdd_with_leaders(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd) From e6dfaf667c7dd3e8ea519b6bf37b282466ba7437 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 May 2015 15:23:30 -0500 Subject: [PATCH 17/18] [SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger jenkins again --- .../scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index b11a73317163..7972ae9103c5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -229,7 +229,7 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } - /** wait until the leader offset for the given topic / partition equals the specified offset */ + /** wait until the leader offset for the given topic/partition equals the specified offset */ def waitUntilLeaderOffset( topic: String, partition: Int, From 803aa2ccb31c909b046727c28eca2c08ce279e6f Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Fri, 1 May 2015 16:40:36 -0500 Subject: [PATCH 18/18] [SPARK-2808][Streaming][Kafka] code cleanup per TD --- .../spark/streaming/kafka/KafkaCluster.scala | 7 +---- .../streaming/kafka/KafkaTestUtils.scala | 27 +++++++++---------- .../spark/streaming/kafka/KafkaRDDSuite.scala | 1 + python/pyspark/streaming/tests.py | 2 ++ 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index 5425d338311c..6cf254a7b69c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -38,11 +38,6 @@ private[spark] class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig} - /** Constructor that takes a Java map */ - def this(kafkaParams: java.util.Map[String, String]) { - this(kafkaParams.asScala.toMap) - } - // ConsumerConfig isn't serializable @transient private var _config: SimpleConsumerConfig = null @@ -227,7 +222,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { // scalastyle:on // this 0 here indicates api version, in this case the original ZK backed api. - def defaultConsumerApiVersion: Short = 0 + private def defaultConsumerApiVersion: Short = 0 /** Requires Kafka >= 0.8.1.1 */ def getConsumerOffsets( diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 7972ae9103c5..6dc4e9517d5a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -229,7 +229,7 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } - /** wait until the leader offset for the given topic/partition equals the specified offset */ + /** Wait until the leader offset for the given topic/partition equals the specified offset */ def waitUntilLeaderOffset( topic: String, partition: Int, @@ -245,20 +245,19 @@ private class KafkaTestUtils extends Logging { } private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { + case Some(partitionState) => + val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + + ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && + Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && + leaderAndInSyncReplicas.isr.size >= 1 + + case _ => + false + } eventually(Time(10000), Time(100)) { - assert( - server.apis.metadataCache.getPartitionInfo(topic, partition) match { - case Some(partitionState) => - val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined && - Request.isValidBrokerId(leaderAndIsr.leader) && - leaderAndIsr.isr.size >= 1 - - case _ => - false - }, - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) + assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index fbaed25ac217..39c3fb448ff5 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -101,6 +101,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap + // make sure consumer offsets are committed before the next getRdd call kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold( err => throw new Exception(err.mkString("\n")), _ => () diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 5f18e1dbb922..33ea8c9293d7 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -631,6 +631,7 @@ def test_kafka_direct_stream(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) self._validateStreamResult(sendData, stream) @@ -645,6 +646,7 @@ def test_kafka_direct_stream_from_offset(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets) self._validateStreamResult(sendData, stream)