Skip to content

Commit 08d0f60

Browse files
committed
Fix Streaming Python unit tests
1 parent 1ea7a52 commit 08d0f60

File tree

5 files changed

+13
-14
lines changed

5 files changed

+13
-14
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.flume
1919

2020
import java.net.{InetSocketAddress, ServerSocket}
2121
import java.nio.ByteBuffer
22+
import java.util.{List => JList}
2223
import java.util.Collections
2324

2425
import scala.collection.JavaConverters._
@@ -59,10 +60,10 @@ private[flume] class FlumeTestUtils {
5960
}
6061

6162
/** Send data to the flume receiver */
62-
def writeInput(input: Seq[String], enableCompression: Boolean): Unit = {
63+
def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
6364
val testAddress = new InetSocketAddress("localhost", testPort)
6465

65-
val inputEvents = input.map { item =>
66+
val inputEvents = input.asScala.map { item =>
6667
val event = new AvroFlumeEvent
6768
event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
6869
event.setHeaders(Collections.singletonMap("test", "header"))

external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming.flume
1919

2020
import java.util.concurrent._
21-
import java.util.{Map => JMap, Collections}
21+
import java.util.{Collections, List => JList, Map => JMap}
2222

2323
import scala.collection.mutable.ArrayBuffer
2424

@@ -137,7 +137,7 @@ private[flume] class PollingFlumeTestUtils {
137137
/**
138138
* A Python-friendly method to assert the output
139139
*/
140-
def assertOutput(outputHeaders: Seq[JMap[String, String]], outputBodies: Seq[String]): Unit = {
140+
def assertOutput(outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
141141
require(outputHeaders.size == outputBodies.size)
142142
val eventSize = outputHeaders.size
143143
if (eventSize != totalEventsPerChannel * channels.size) {
@@ -151,8 +151,8 @@ private[flume] class PollingFlumeTestUtils {
151151
var found = false
152152
var j = 0
153153
while (j < eventSize && !found) {
154-
if (eventBodyToVerify == outputBodies(j) &&
155-
eventHeaderToVerify == outputHeaders(j)) {
154+
if (eventBodyToVerify == outputBodies.get(j) &&
155+
eventHeaderToVerify == outputHeaders.get(j)) {
156156
found = true
157157
counter += 1
158158
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
120120
case (key, value) => (key.toString, value.toString)
121121
}).map(_.asJava)
122122
val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
123-
utils.assertOutput(headers, bodies)
123+
utils.assertOutput(headers.asJava, bodies.asJava)
124124
}
125125
} finally {
126126
ssc.stop()

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
5454
val outputBuffer = startContext(utils.getTestPort(), testCompression)
5555

5656
eventually(timeout(10 seconds), interval(100 milliseconds)) {
57-
utils.writeInput(input, testCompression)
57+
utils.writeInput(input.asJava, testCompression)
5858
}
5959

6060
eventually(timeout(10 seconds), interval(100 milliseconds)) {

python/pyspark/streaming/tests.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,12 @@ class CheckpointTests(unittest.TestCase):
611611
@staticmethod
612612
def tearDownClass():
613613
# Clean up in the JVM just in case there has been some issues in Python API
614-
jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
614+
jStreamingContextOption = \
615+
SparkContext._jvm.org.apache.spark.streaming.StreamingContext.getActive()
615616
if jStreamingContextOption.nonEmpty():
616617
jStreamingContextOption.get().stop()
617-
jSparkContextOption = SparkContext._jvm.SparkContext.get()
618-
if jSparkContextOption.nonEmpty():
619-
jSparkContextOption.get().stop()
620618

621-
def __init__(self):
619+
def setUp(self):
622620
self.ssc = None
623621
self.sc = None
624622
self.cpd = None
@@ -653,7 +651,7 @@ def setup():
653651
self.cpd = tempfile.mkdtemp("test_streaming_cps")
654652
self.setupCalled = False
655653
self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
656-
self.assertFalse(self.setupCalled)
654+
self.assertTrue(self.setupCalled)
657655

658656
self.ssc.start()
659657

0 commit comments

Comments
 (0)