-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval #17525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #75504 has started for PR 17525 at commit |
| */ | ||
| def nextBatchTime(now: Long): Long = { | ||
| now / intervalMs * intervalMs + intervalMs | ||
| if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the doc seems wrong btw, mind fixing it? nextBatchTime(nextBatchTime(0)) = 100 or am I understanding it wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spoken offline, this isnt wrong.
| } | ||
| } | ||
|
|
||
| def isStreamWaitingAt(time: Long): Boolean = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mind adding docs on when these should be used?
|
Test build #75514 has finished for PR 17525 at commit
|
|
Test build #75520 has finished for PR 17525 at commit
|
|
Test build #75521 has finished for PR 17525 at commit
|
| testStream(mapped, OutputMode.Complete)( | ||
| StartStream(ProcessingTime(100), triggerClock = clock), | ||
| AssertStreamExecThreadToWaitForClock(), | ||
| StartStream(ProcessingTime(1000), triggerClock = clock), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test needed fixing because this manual clock test was configured such that first batch takes > 100 ms even though the trigger interval was 100 ms. This caused additional batch to be automatically executed without waiting for the manual clock to be advance, thus breaking certain assumptions in the test.
|
Test build #75524 has finished for PR 17525 at commit
|
| clockIncrementInTrigger = 1500 | ||
| manualClock.setTime(2000) | ||
| eventually { | ||
| assert(lastTriggerTime === 3500) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was hard to understand that the test is actually testing that this value is 3500 instead of 2000. Could you add a quick comment?
|
left one minor comment, otherwise LGTM |
|
Thanks for the change. It is easier to understand when things are being triggered now. LGTM. |
|
Test build #75527 has finished for PR 17525 at commit
|
…took longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <[email protected]> Closes apache#17525 from tdas/SPARK-20209. (cherry picked from commit dad499f)
…took longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <[email protected]> Closes apache#17525 from tdas/SPARK-20209. (cherry picked from commit dad499f)
…took longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <[email protected]> Closes apache#17525 from tdas/SPARK-20209. (cherry picked from commit dad499f)
…took longer than trigger interval ## What changes were proposed in this pull request? For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval. In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock. ## How was this patch tested? Added new unit tests to comprehensively test this behavior. Author: Tathagata Das <[email protected]> Closes apache#17525 from tdas/SPARK-20209. (cherry picked from commit dad499f)
What changes were proposed in this pull request?
For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval.
In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock.
How was this patch tested?
Added new unit tests to comprehensively test this behavior.