-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage #3801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hopefully this will fix SPARK-1600.
|
These changes are split off from #3687, a larger PR of mine which tried to remove all uses of Thread.sleep() in the streaming tests. It may look like there are a lot of changes here, but most of that is due to indentation changes when I modified tests to use the /cc @tdas for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I factored some of the common code out here.
|
Test build #24820 has started for PR 3801 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Thread.sleep call was to ensure that we're actually inside a fileStream.map task when the context shuts down. This might not be necessary given the waitForTotalBatchesStarted call on the previous line. AFAIK, the reason why I had this here was that waitForTotalBatchesStarted might fire after streaming starts the batch but before the underlying Spark jobs start. If we do need to block until Spark itself actually begins the processing, then that could be a little trickier. I suppose I could just use a SparkListener for that.
I toyed around with the idea of using a semaphore, but that's tricky because you want to ensure that the task and the caller have the same Semaphore object in the JVM. Since the task is serialized, you effectively have to have a global object with a hashmap that holds the semaphore, which is a bit messy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the earlier unit test was running in real time, this Thread.sleep was to ensure that the time had progressed since the ssc.start() so that the first file that was generated has modtime > context start time. But that is not necessary any more since you are using manual clock and explicitly setting the mod time of the file. So I believe this can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we want to make sure that the stop is made before the 3rd batch has completed, lets put an assert after the stop that only 2 batches have been completed (maybe some more method in the waiter) and that the output has only two values (1 and 3).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that my original thought here was that it somehow mattered whether the underlying Spark job had started, with the idea that the batch start event could happen before the corresponding Spark core events. But from Streaming's perspective, I guess this doesn't really make sense since batch-level metadata and bookkeeping operations would be tied to the Streaming start events and not the Spark ones.
As you've suggested, I can just replace this with an assert on the number of completed batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried removing this sleep and it broke the test; I guess that maybe there is a delay between the time that we get the batch started event and the time that the file is recorded.
|
Test build #24820 has finished for PR 3801 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont see the tests in earlier lines 334-336 here. Those test whether after recovering from checkpoint the files are still remember. Although this gets checked eventually by the output, but these test actually fails fast pinpointing the problem. Would be good put them back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're referring to 334-336 in the original diff, that got rewritten to the more compact
assert(recordedFiles(ssc) === Seq(1, 2, 3))
which appears below. It looks like the test on line 334 checks right after creating the restarted streaming context and I think I've preserved that here on line 363 in the new file. Am I overlooking something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you are right! Mind adding a comment on the line saying that so that I dont get confused again :)
|
@JoshRosen I did a pass and I had some comments. Overall it looks pretty good. Addressing those comments should make it good enough. Could you tell me how much is the speed up by converting the unit tests from real clock to manual clock? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurred to me that this might be misleadingly-named since it waits until at least that many batches have been processed. To avoid this naming issue, plus a proliferation of similar methods, I might be able to just introduce a helper class that encapsulates this "synchronize on an object and wait for a condition involving it to become true" pattern.
I'm imagining that it could look something vaguely like
def waitUntil[T](obj: T, condition: T => Boolean, timeout: Long): Unit = {
obj.synchronized {
while(!condition(obj)) {
[...] // do the wait() logic here
}
}
}
that encapsulates this wait / notify pattern, so could write something like
```scala
waitUntil(waiter, _.completedBatches > 2, timeout, seconds(10)Or, with an implicit conversion, something like
waiter.waitUntil(_.completedBatches > 2, timeout=seconds(10))
which is a nice-looking syntax and avoids those issues of having to name inequalities.
Similar to your suggestion on another PR, we could add a pollUntil method that works for objects that don't support monitor notification / synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, something even simpler (since this is just for test code): I can just copy ScalaTest's eventually and modify it to use synchronize / notify instead of polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, even this might be overkill: I can just use vanilla eventually since we're still blocking on a condition to occur and not relying on real-clock time; I guess using wait instead of sleep is just an optimization that might save a small amount of test time, but it's not related to flakiness (the goal of this PR). Therefore, I'll just remove all of this in favor of eventually since then I'll benefit from ScalaTest's nice assertion macros.
|
Test build #25052 has started for PR 3801 at commit
|
|
Pushed some commits addressing most of the feedback, but I'm still struggling to remove that last |
|
Test build #25052 has finished for PR 3801 at commit
|
|
Test FAILed. |
|
Right, that is probably the case. The checkpointing is asynchronous, and there isnt any callback hook for that, which we can use to wait on. We could wait for the expected checkpoint file to appear in the directory. That will make it deterministic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use toSet and set comparison methods here.
|
Test build #25071 has started for PR 3801 at commit
|
|
Test build #25071 has finished for PR 3801 at commit
|
|
Test PASSed. |
|
Test build #25086 has started for PR 3801 at commit
|
|
Test build #25086 has finished for PR 3801 at commit
|
|
Test PASSed. |
|
I am merging this. Thanks @JoshRosen for this humongous effort! |
…calls and SystemClock usage This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <[email protected]> Closes apache#3801 from JoshRosen/SPARK-1600 and squashes the following commits: e4494f4 [Josh Rosen] Address a potential race when setting file modification times 8340bd0 [Josh Rosen] Use set comparisons for output. 0b9c252 [Josh Rosen] Fix some ManualClock usage problems. 1cc689f [Josh Rosen] ConcurrentHashMap -> SynchronizedMap db26c3a [Josh Rosen] Use standard timeout in ScalaTest `eventually` blocks. 3939432 [Josh Rosen] Rename StreamingTestWaiter to BatchCounter 0b9c3a1 [Josh Rosen] Wait for checkpoint to complete 863d71a [Josh Rosen] Remove Thread.sleep that was used to make task run slowly b4442c3 [Josh Rosen] batchTimeToSelectedFiles should be thread-safe 15b48ee [Josh Rosen] Replace several TestWaiter methods w/ ScalaTest eventually. fffc51c [Josh Rosen] Revert "Remove last remaining sleep() call" dbb8247 [Josh Rosen] Remove last remaining sleep() call 566a63f [Josh Rosen] Fix log message and comment typos da32f3f [Josh Rosen] Fix log message and comment typos 3689214 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-1600 c8f06b1 [Josh Rosen] Remove Thread.sleep calls in FileInputStream CheckpointSuite test. d4f2d87 [Josh Rosen] Refactor file input stream tests to not rely on SystemClock. dda1403 [Josh Rosen] Add StreamingTestWaiter class. 3c3efc3 [Josh Rosen] Synchronize `currentTime` in ManualClock a95ddc4 [Josh Rosen] Modify FileInputDStream to use Clock class.
…calls and SystemClock usage (branch-1.2 backport) (This PR backports #3801 into `branch-1.2` (1.2.2)) This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <[email protected]> Closes #4633 from JoshRosen/spark-1600-b12-backport and squashes the following commits: e5d3dc4 [Josh Rosen] [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage
…calls and SystemClock usage (branch-1.2 backport) (This PR backports apache#3801 into `branch-1.2` (1.2.2)) This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600). Key changes: - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock. - Fix a synchronization issue in ManualClock's `currentTime` method. - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished. - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls. - Update these tests to use the withStreamingContext fixture. Author: Josh Rosen <[email protected]> Closes apache#4633 from JoshRosen/spark-1600-b12-backport and squashes the following commits: e5d3dc4 [Josh Rosen] [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage
This patch refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600).
Key changes:
currentTimemethod.