Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
import java.io.File

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.{Eventually, Timeouts}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata

class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
Expand Down Expand Up @@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}


ignore("basic stream receiving with multiple topics and smallest starting offset") {
test("basic stream receiving with multiple topics and smallest starting offset") {
val topics = Set("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
createTopic(t)
sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val kafkaParams = Map(
"metadata.broker.list" -> s"$brokerAddress",
"auto.offset.reset" -> "smallest"
Expand All @@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
}
var total = 0L

val allReceived = new ArrayBuffer[(String, String)]

stream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
Expand All @@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
total += collected.size // Add up all the collected items
}
stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(total === data.values.sum * topics.size, "didn't get all messages")
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n"))
}
ssc.stop()
}

ignore("receiving from largest starting offset") {
test("receiving from largest starting offset") {
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
Expand Down Expand Up @@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}


ignore("creating stream by offset") {
test("creating stream by offset") {
val topic = "offset"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
Expand Down Expand Up @@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
}

// Test to verify the offset ranges can be recovered from the checkpoints
ignore("offset recovery") {
test("offset recovery") {
val topic = "recovery"
createTopic(topic)
testDir = Utils.createTempDir()
Expand Down