Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
// Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
Expand All @@ -368,6 +367,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
}
ssc.start()
ssc.awaitTerminationOrTimeout(500)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(SlowTestReceiver.initialized)
}
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
assert(runningCount > 0)
Expand Down Expand Up @@ -959,6 +961,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {

var receivingThreadOption: Option[Thread] = None
@volatile var receivedAllRecords = false

def onStart() {
val thread = new Thread() {
Expand All @@ -968,25 +971,26 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
Thread.sleep(1000 / recordsPerSecond)
store(i)
}
SlowTestReceiver.receivedAllRecords = true
receivedAllRecords = true
logInfo(s"Received all $totalRecords records")
}
}
receivingThreadOption = Some(thread)
thread.start()
SlowTestReceiver.initialized = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in the Thread.run() implementation above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we consider only timing, technically it can be placed anywhere, even first line of onStart(), as whether receiver is registered within time or not is the key. For verification, it doesn't make existing test failing without patch even we add Thread.sleep(1000) in first line of onStart().

So no strong opinion on where to put.

Btw, maybe we can apply "more verbose but clearer" solution (without this flag) via adding below code in test side:

    // tracks whether the receiver is started or not
    var isReceiverStarted = false
    val listener = new StreamingListener {
      override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
        isReceiverStarted = receiverStarted.receiverInfo.name.startsWith(
          input.getReceiver().getClass.getSimpleName)
      }
    }
    ssc.addStreamingListener(listener)

    ssc.start()
    ssc.awaitTerminationOrTimeout(500)

    eventually(timeout(10.seconds), interval(10.millis)) {
      assert(isReceiverStarted)
    }

Which one do you think is better?

Copy link
Contributor

@vanzin vanzin Sep 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as whether receiver is registered within time or not is the key

Ah ok. Was wondering if the thread that actually does stuff needed to run for this to work, but if it's just the registration that matters, this is enough.

But can't the flag be in the actual SlowTestReceiver instance (instead of the object)?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can't access these flags if we place them in instance - as the receiver will be executed in executor (say, different JVM, or at least, serialized - deserialized even in test). So exposing them in object is really a hack only for testing.

Btw, receivedAllRecords is actually not needed to be exposed as outside, as we create new receiver instance and we can just set the default value to false. If we move it, we can either remove hack entirely via adding more verbose code or keep the hack for concise code change.

I'll move out receivedAllRecords for now: please let me know if you prefer to remove the hack, then I'll make a change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the receiver will be executed in executor

ok, that makes sense; I was under the impression it was a driver thing since an instance of it was passed to the context, instead of one created somehow in the executor.

Anyway, what you have is fine.

}

def onStop() {
// Simulate slow receiver by waiting for all records to be produced
while (!SlowTestReceiver.receivedAllRecords) {
while (!receivedAllRecords) {
Thread.sleep(100)
}
// no clean to be done, the receiving thread should stop on it own
}
}

object SlowTestReceiver {
var receivedAllRecords = false
var initialized = false
}

/** Streaming application for testing DStream and RDD creation sites */
Expand Down