diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index e05e4318969c..561ed4babe5d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -115,7 +115,6 @@
maven-shade-plugin
false
- ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar
*:*
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 36342f37bb2e..6f4e2a89e9af 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -142,7 +142,6 @@
maven-shade-plugin
false
- ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar
*:*
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
index f3e3f93e7ed5..841260063373 100644
--- a/external/mqtt-assembly/pom.xml
+++ b/external/mqtt-assembly/pom.xml
@@ -132,7 +132,6 @@
maven-shade-plugin
false
- ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar
*:*
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
index 3ca538608f69..51af3e6f2225 100644
--- a/extras/kinesis-asl-assembly/pom.xml
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -137,7 +137,6 @@
maven-shade-plugin
false
- ${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar
*:*
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 214d5be43900..e687412a7cfe 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1133,11 +1133,20 @@ def get_output(_, rdd):
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because
+# the artifact jars are in different directories.
+def search_jar(dir, name_prefix):
+ # We should ignore the following jars
+ ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
+ jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build
+ glob.glob(os.path.join(dir, "target/" + name_prefix + "_*.jar"))) # maven build
+ return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)]
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
- jars = glob.glob(
- os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar"))
+ jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
@@ -1145,8 +1154,8 @@ def search_kafka_assembly_jar():
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
- "remove all but one") % kafka_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1154,8 +1163,7 @@ def search_kafka_assembly_jar():
def search_flume_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
- jars = glob.glob(
- os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+ jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
@@ -1163,8 +1171,8 @@ def search_flume_assembly_jar():
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
- "remove all but one") % flume_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1172,8 +1180,7 @@ def search_flume_assembly_jar():
def search_mqtt_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
- jars = glob.glob(
- os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar"))
+ jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
@@ -1181,8 +1188,8 @@ def search_mqtt_assembly_jar():
"'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
"'build/mvn package' before running this test")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please "
- "remove all but one") % mqtt_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1198,8 +1205,8 @@ def search_mqtt_test_jar():
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please "
- "remove all but one") % mqtt_test_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1207,14 +1214,12 @@ def search_mqtt_test_jar():
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
- jars = glob.glob(
- os.path.join(kinesis_asl_assembly_dir,
- "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly")
if not jars:
return None
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
- "remove all but one") % kinesis_asl_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1240,8 +1245,8 @@ def search_kinesis_asl_assembly_jar():
mqtt_test_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
- testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
- CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
+ testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)