From 7b88be8e2898068e9dd2fba14a96e377e9ed219c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Feb 2015 19:59:29 -0800 Subject: [PATCH 1/3] Fixed --jar not working for KafkaUtils and improved error message --- .../spark/streaming/kafka/KafkaUtils.scala | 21 +++++++++- python/pyspark/streaming/kafka.py | 38 ++++++++++++------- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index af04bc6576148..64a5eeb7e5aca 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata -import kafka.serializer.{Decoder, StringDecoder} +import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.{SparkContext, SparkException} @@ -532,3 +532,22 @@ object KafkaUtils { ) } } + +private[kafka] +class KafkaUtilsPythonHelper { + def createStream( + jssc: JavaStreamingContext, + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = { + KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( + jssc, + classOf[Array[Byte]], + classOf[Array[Byte]], + classOf[DefaultDecoder], + classOf[DefaultDecoder], + kafkaParams, + topics, + storageLevel) + } +} diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 19ad71f99d4d5..96f2674fd35cd 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -16,7 +16,7 @@ # from py4j.java_collections import MapConverter -from py4j.java_gateway import java_import, Py4JError +from py4j.java_gateway import java_import, Py4JError, Py4JJavaError from pyspark.storagelevel import StorageLevel from pyspark.serializers import PairDeserializer, NoOpSerializer @@ -63,20 +63,32 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - def getClassByName(name): - return ssc._jvm.org.apache.spark.util.Utils.classForName(name) - try: - array = getClassByName("[B") - decoder = getClassByName("kafka.serializer.DefaultDecoder") - jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder, - jparam, jtopics, jlevel) - except Py4JError, e: + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) + except Py4JJavaError, e: # TODO: use --jar once it also work on driver - if not e.message or 'call a package' in e.message: - print "No kafka package, please put the assembly jar into classpath:" - print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \ - "scala-*/spark-streaming-kafka-assembly-*.jar" + if 'ClassNotFoundException' in str(e.java_exception): + print """ +________________________________________________________________________________________________ + + Spark Streaming's Kafka libraries not found in class path. Try one of the following. + + 1. Include the Kafka library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --package org.apache.spark:spark-streaming-kafka:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. + Then, innclude the jar in the spark-submit command as + + $ bin/spark-submit --jars ... + +________________________________________________________________________________________________ + +""" % (ssc.sparkContext.version, ssc.sparkContext.version) raise e ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) stream = DStream(jstream, ssc, ser) From c1fdf35a45584ebf9f330339405524f6753eb808 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Feb 2015 21:48:09 -0800 Subject: [PATCH 2/3] Fixed long line and improved documentation --- .../org/apache/spark/streaming/kafka/KafkaUtils.scala | 9 +++++++++ python/pyspark/streaming/kafka.py | 6 ++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 64a5eeb7e5aca..2ef2c37c662e7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -533,6 +533,15 @@ object KafkaUtils { } } +/** + * This is a helper class that wraps the KafkaUtils.createStream() into more + * Python-friendly class and function so that it can be easily + * instantiated and called from Python's KafkaUtils (see SPARK-6027). + * + * The zero-arg constructor helps instantiate this class from the Class object + * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() + * takes care of known parameters instead of passing them from Python + */ private[kafka] class KafkaUtilsPythonHelper { def createStream( diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 96f2674fd35cd..d6428c1d6d3ca 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -64,7 +64,9 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, jlevel = ssc._sc._getJavaStorageLevel(storageLevel) try: - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") + # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027) + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") helper = helperClass.newInstance() jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel) except Py4JJavaError, e: @@ -78,7 +80,7 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, 1. Include the Kafka library and its dependencies with in the spark-submit command as - $ bin/spark-submit --package org.apache.spark:spark-streaming-kafka:%s ... + $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ... 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. From fb16b0410e4b078c00382089ad7e6b0227d8d13c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Feb 2015 21:57:52 -0800 Subject: [PATCH 3/3] Removed import --- python/pyspark/streaming/kafka.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index d6428c1d6d3ca..0002dc10e8a17 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -50,8 +50,6 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ - java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils") - kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId,