Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.streaming.kafka

import java.io.OutputStream
import java.lang.{Integer => JInt, Long => JLong}
import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap, Set => JSet}

Expand Down Expand Up @@ -682,7 +682,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[(Array[Byte], Array[Byte])] = {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
Expand All @@ -692,7 +692,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] = {
val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
Expand All @@ -704,7 +704,7 @@ private[kafka] class KafkaUtilsPythonHelper {
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JSet[String],
fromOffsets: JMap[TopicAndPartition, JLong],
fromOffsets: JMap[TopicAndPartition, JNumber],
messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {

val currentFromOffsets = if (!fromOffsets.isEmpty) {
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ def __eq__(self, other):
def __ne__(self, other):
return not self.__eq__(other)

def __hash__(self):
return (self._topic, self._partition).__hash__()


class Broker(object):
"""
Expand Down
12 changes: 3 additions & 9 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
else:
import unittest

if sys.version >= "3":
long = int

from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
Expand Down Expand Up @@ -1058,7 +1061,6 @@ def test_kafka_direct_stream(self):
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_from_offset(self):
"""Test the Python direct Kafka stream API with start offset specified."""
topic = self._randomTopic()
Expand All @@ -1072,7 +1074,6 @@ def test_kafka_direct_stream_from_offset(self):
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
self._validateStreamResult(sendData, stream)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd(self):
"""Test the Python direct Kafka RDD API."""
topic = self._randomTopic()
Expand All @@ -1085,7 +1086,6 @@ def test_kafka_rdd(self):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_with_leaders(self):
"""Test the Python direct Kafka RDD API with leaders."""
topic = self._randomTopic()
Expand All @@ -1100,7 +1100,6 @@ def test_kafka_rdd_with_leaders(self):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_get_offsetRanges(self):
"""Test Python direct Kafka RDD get OffsetRanges."""
topic = self._randomTopic()
Expand All @@ -1113,7 +1112,6 @@ def test_kafka_rdd_get_offsetRanges(self):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self.assertEqual(offsetRanges, rdd.offsetRanges())

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_foreach_get_offsetRanges(self):
"""Test the Python direct Kafka stream foreachRDD get offsetRanges."""
topic = self._randomTopic()
Expand All @@ -1138,7 +1136,6 @@ def getOffsetRanges(_, rdd):

self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_transform_get_offsetRanges(self):
"""Test the Python direct Kafka stream transform get offsetRanges."""
topic = self._randomTopic()
Expand Down Expand Up @@ -1176,7 +1173,6 @@ def test_topic_and_partition_equality(self):
self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_transform_with_checkpoint(self):
"""Test the Python direct Kafka stream transform with checkpoint correctly recovered."""
topic = self._randomTopic()
Expand Down Expand Up @@ -1225,7 +1221,6 @@ def setup():
finally:
shutil.rmtree(tmpdir)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_message_handler(self):
"""Test Python direct Kafka RDD MessageHandler."""
topic = self._randomTopic()
Expand All @@ -1242,7 +1237,6 @@ def getKeyAndDoubleMessage(m):
messageHandler=getKeyAndDoubleMessage)
self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd)

@unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_direct_stream_message_handler(self):
"""Test the Python direct Kafka stream MessageHandler."""
topic = self._randomTopic()
Expand Down