Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
1 change: 0 additions & 1 deletion external/kafka-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
1 change: 0 additions & 1 deletion external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
1 change: 0 additions & 1 deletion extras/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
47 changes: 26 additions & 21 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,56 +1133,63 @@ def get_output(_, rdd):
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)


# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because
# the artifact jars are in different directories.
def search_jar(dir, name_prefix):
# We should ignore the following jars
ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build
glob.glob(os.path.join(dir, "target/" + name_prefix + "_*.jar"))) # maven build
return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)]


def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
jars = glob.glob(
os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar"))
jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir)
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]


def search_flume_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
jars = glob.glob(
os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
"remove all but one") % flume_assembly_dir)
raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]


def search_mqtt_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
jars = glob.glob(
os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar"))
jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
"'build/mvn package' before running this test")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please "
"remove all but one") % mqtt_assembly_dir)
raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]

Expand All @@ -1198,23 +1205,21 @@ def search_mqtt_test_jar():
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please "
"remove all but one") % mqtt_test_dir)
raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]


def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
jars = glob.glob(
os.path.join(kinesis_asl_assembly_dir,
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly")
if not jars:
return None
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
"remove all but one") % kinesis_asl_assembly_dir)
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
else:
return jars[0]

Expand All @@ -1240,8 +1245,8 @@ def search_kinesis_asl_assembly_jar():
mqtt_test_jar, kinesis_asl_assembly_jar)

os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch that this was missing! Thanks @zsxwing


if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
Expand Down