Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e768164
#2808 update kafka to version 0.8.2
Dec 7, 2014
d9dc2bc
Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.…
Dec 23, 2014
2e67c66
#SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
Feb 5, 2015
6953429
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
koeninger Feb 11, 2015
77de6c2
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Mar 18, 2015
407382e
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
koeninger Mar 18, 2015
ed02d2c
[SPARK-2808][Streaming][Kafka] move default argument for api version …
koeninger Apr 15, 2015
1d10751
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 27, 2015
c70ee43
[SPARK-2808][Streaming][Kafka] add more asserts to test, try to figur…
koeninger Apr 28, 2015
9edab4c
[SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins fail…
koeninger Apr 28, 2015
af6f3ec
[SPARK-2808][Streaming][Kafka] delay test until latest leader offset …
koeninger Apr 29, 2015
61b3464
[SPARK-2808][Streaming][Kafka] delay for second send in boundary cond…
koeninger Apr 29, 2015
3824ce3
[SPARK-2808][Streaming][Kafka] naming / comments per tdas
koeninger Apr 29, 2015
2b92d3f
[SPARK-2808][Streaming][Kafka] wait for leader offsets in the java te…
koeninger Apr 29, 2015
2712649
[SPARK-2808][Streaming][Kafka] add more logging to python test, see w…
koeninger Apr 29, 2015
115aeee
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 29, 2015
4c4557f
[SPARK-2808][Streaming][Kafka] add even more logging to python test
koeninger Apr 30, 2015
1d896e2
[SPARK-2808][Streaming][Kafka] add even even more logging to python test
koeninger Apr 30, 2015
30d991d
[SPARK-2808][Streaming][Kafka] remove stderr prints since it breaks p…
koeninger May 1, 2015
d4267e9
[SPARK-2808][Streaming][Kafka] fix stderr redirect in python test script
koeninger May 1, 2015
1770abc
[SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to c…
koeninger May 1, 2015
e6dfaf6
[SPARK-2808][Streaming][Kafka] pointless whitespace change to trigger…
koeninger May 1, 2015
803aa2c
[SPARK-2808][Streaming][Kafka] code cleanup per TD
koeninger May 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.1.1</version>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.streaming.kafka
import scala.util.control.NonFatal
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import java.util.Properties
import kafka.api._
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException

Expand Down Expand Up @@ -220,12 +221,22 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on

// this 0 here indicates api version, in this case the original ZK backed api.
private def defaultConsumerApiVersion: Short = 0

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, Long]] =
getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)

def getConsumerOffsets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These two can be merged with default arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was originally using default arguments. That's what was causing the MiMa binary compatibility errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is frigging weird. This is in an internal class and should not be throwing MIMA compatibility errors. :/
anyways, its fine as is. better than fighting with mima right now.

groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
Expand All @@ -236,9 +247,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)

def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
Expand Down Expand Up @@ -266,24 +284,39 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)

def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
setConsumerOffsetMetadata(groupId, offsets.map { kv =>
kv._1 -> OffsetMetadataAndError(kv._2)
})
val meta = offsets.map { kv =>
kv._1 -> OffsetAndMetadata(kv._2)
}
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
metadata: Map[TopicAndPartition, OffsetAndMetadata]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)

def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata)
val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.requestInfo
val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import scala.language.postfixOps
import scala.util.control.NonFatal

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZKStringSerializer
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.I0Itec.zkclient.ZkClient

Expand Down Expand Up @@ -227,12 +229,35 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
/** Wait until the leader offset for the given topic/partition equals the specified offset */
def waitUntilLeaderOffset(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this function do? Either make the name more meaningful (like waitUntilMetadataIsPropagated) or add scala docs.

topic: String,
partition: Int,
offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
val tp = TopicAndPartition(topic, partition)
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
server.apis.metadataCache.containsTopicAndPartition(topic, partition),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
llo == offset,
s"$topic $partition $offset not reached after timeout")
}
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr

ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.size >= 1

case _ =>
false
}
eventually(Time(10000), Time(100)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public void testKafkaRDD() throws InterruptedException {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());

kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
}

test("basic usage") {
val topic = "topicbasic"
val topic = s"topicbasic-${Random.nextInt}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change necessary?

kafkaTestUtils.createTopic(topic)
val messages = Set("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages.toArray)


val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
"group.id" -> s"test-consumer-${Random.nextInt}")

kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)

val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))

Expand All @@ -73,34 +74,47 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {

test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = "topic1"
val topic = s"topicboundary-${Random.nextInt}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)

val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
"group.id" -> s"test-consumer-${Random.nextInt}")

val kc = new KafkaCluster(kafkaParams)

// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
val sentCount = sent.values.sum
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)

// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))

assert(rdd.isDefined)
assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")

val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum

kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
assert(rangeCount === sentCount, "offset range didn't include all sent messages")
assert(rdd.get.count === sentCount, "didn't get all sent messages")

val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap

// make sure consumer offsets are committed before the next getRdd call
kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for? Its not obvious, could you add a comment?

err => throw new Exception(err.mkString("\n")),
_ => ()
)

// this is the "0 messages" case
val rdd2 = getRdd(kc, Set(topic))
// shouldn't get anything, since message is sent after rdd was defined
val sentOnlyOne = Map("d" -> 1)

kafkaTestUtils.sendMessages(topic, sentOnlyOne)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)

assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")

Expand Down
8 changes: 5 additions & 3 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,6 @@ def _validateRddResult(self, sendData, rdd):
result = {}
for i in rdd.map(lambda x: x[1]).collect():
result[i] = result.get(i, 0) + 1

self.assertEqual(sendData, result)

def test_kafka_stream(self):
Expand All @@ -616,6 +615,7 @@ def test_kafka_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
Expand All @@ -631,6 +631,7 @@ def test_kafka_direct_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)
Expand All @@ -645,6 +646,7 @@ def test_kafka_direct_stream_from_offset(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)
Expand All @@ -659,7 +661,7 @@ def test_kafka_rdd(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)

self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be added for the all the kafka unit tests, whereever there is a sendMessage. Its missing in test_kafka_direct_stream***

rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

Expand All @@ -675,7 +677,7 @@ def test_kafka_rdd_with_leaders(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)

self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)

Expand Down