Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{Decoder, StringDecoder}
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.{SparkContext, SparkException}
Expand Down Expand Up @@ -532,3 +532,31 @@ object KafkaUtils {
)
}
}

/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
* takes care of known parameters instead of passing them from Python
*/
private[kafka]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it still work if this is strictly private? Just wondering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I will be able to call from python if this is strictly private. I havent tested it though. I dont think that;s a big deal, as long as its hidden from public view

class KafkaUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
jssc,
classOf[Array[Byte]],
classOf[Array[Byte]],
classOf[DefaultDecoder],
classOf[DefaultDecoder],
kafkaParams,
topics,
storageLevel)
}
}
42 changes: 27 additions & 15 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from py4j.java_collections import MapConverter
from py4j.java_gateway import java_import, Py4JError
from py4j.java_gateway import java_import, Py4JError, Py4JJavaError

from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
Expand Down Expand Up @@ -50,8 +50,6 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object
"""
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")

kafkaParams.update({
"zookeeper.connect": zkQuorum,
"group.id": groupId,
Expand All @@ -63,20 +61,34 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove java_import()

jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

def getClassByName(name):
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)

try:
array = getClassByName("[B")
decoder = getClassByName("kafka.serializer.DefaultDecoder")
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
jparam, jtopics, jlevel)
except Py4JError, e:
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
except Py4JJavaError, e:
# TODO: use --jar once it also work on driver
if not e.message or 'call a package' in e.message:
print "No kafka package, please put the assembly jar into classpath:"
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
"scala-*/spark-streaming-kafka-assembly-*.jar"
if 'ClassNotFoundException' in str(e.java_exception):
print """
________________________________________________________________________________________________

Spark Streaming's Kafka libraries not found in class path. Try one of the following.

1. Include the Kafka library and its dependencies with in the
spark-submit command as

$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...

2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
Then, innclude the jar in the spark-submit command as

$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________________________

""" % (ssc.sparkContext.version, ssc.sparkContext.version)
raise e
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
stream = DStream(jstream, ssc, ser)
Expand Down