Skip to content

Commit 7b88be8

Browse files
committed
Fixed --jar not working for KafkaUtils and improved error message
1 parent d20559b commit 7b88be8

File tree

2 files changed

+45
-14
lines changed

2 files changed

+45
-14
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
2727

2828
import kafka.common.TopicAndPartition
2929
import kafka.message.MessageAndMetadata
30-
import kafka.serializer.{Decoder, StringDecoder}
30+
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
3131

3232
import org.apache.spark.api.java.function.{Function => JFunction}
3333
import org.apache.spark.{SparkContext, SparkException}
@@ -532,3 +532,22 @@ object KafkaUtils {
532532
)
533533
}
534534
}
535+
536+
private[kafka]
537+
class KafkaUtilsPythonHelper {
538+
def createStream(
539+
jssc: JavaStreamingContext,
540+
kafkaParams: JMap[String, String],
541+
topics: JMap[String, JInt],
542+
storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
543+
KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
544+
jssc,
545+
classOf[Array[Byte]],
546+
classOf[Array[Byte]],
547+
classOf[DefaultDecoder],
548+
classOf[DefaultDecoder],
549+
kafkaParams,
550+
topics,
551+
storageLevel)
552+
}
553+
}

python/pyspark/streaming/kafka.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717

1818
from py4j.java_collections import MapConverter
19-
from py4j.java_gateway import java_import, Py4JError
19+
from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
2020

2121
from pyspark.storagelevel import StorageLevel
2222
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -63,20 +63,32 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
6363
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
6464
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
6565

66-
def getClassByName(name):
67-
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
68-
6966
try:
70-
array = getClassByName("[B")
71-
decoder = getClassByName("kafka.serializer.DefaultDecoder")
72-
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
73-
jparam, jtopics, jlevel)
74-
except Py4JError, e:
67+
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
68+
helper = helperClass.newInstance()
69+
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
70+
except Py4JJavaError, e:
7571
# TODO: use --jar once it also work on driver
76-
if not e.message or 'call a package' in e.message:
77-
print "No kafka package, please put the assembly jar into classpath:"
78-
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
79-
"scala-*/spark-streaming-kafka-assembly-*.jar"
72+
if 'ClassNotFoundException' in str(e.java_exception):
73+
print """
74+
________________________________________________________________________________________________
75+
76+
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
77+
78+
1. Include the Kafka library and its dependencies with in the
79+
spark-submit command as
80+
81+
$ bin/spark-submit --package org.apache.spark:spark-streaming-kafka:%s ...
82+
83+
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
84+
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
85+
Then, innclude the jar in the spark-submit command as
86+
87+
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
88+
89+
________________________________________________________________________________________________
90+
91+
""" % (ssc.sparkContext.version, ssc.sparkContext.version)
8092
raise e
8193
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
8294
stream = DStream(jstream, ssc, ser)

0 commit comments

Comments
 (0)