Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 20, 2019

What changes were proposed in this pull request?

This patch changes ReceiverSuite."receiver_life_cycle" to record actual calls with timestamp in FakeReceiver/FakeReceiverSupervisor, which doesn't rely on timing of stopping and starting receiver in restarting receiver. It enables us to give enough huge timeout on verification of restart as we can verify both stopping and starting together.

Why are the changes needed?

The test is flaky without this patch. We increased timeout to fix flakyness of this test (15adcc8) but even with longer timeout it has been still failing intermittently.

Does this PR introduce any user-facing change?

No

How was this patch tested?

I've reproduced test failure artificially via below diff:

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index faf6db82d5..d8977543c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -191,9 +191,11 @@ private[streaming] abstract class ReceiverSupervisor(
       // thread pool.
       logWarning("Restarting receiver with delay " + delay + " ms: " + message,
         error.getOrElse(null))
+      Thread.sleep(1000)
       stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
       logDebug("Sleeping for " + delay)
       Thread.sleep(delay)
+      Thread.sleep(1000)
       logInfo("Starting receiver again")
       startReceiver()
       logInfo("Receiver started again")

and confirmed this patch doesn't fail with the change.

@HeartSaVioR HeartSaVioR changed the title [SPARK-23197][STREAMING] Fix ReceiverSuite."receiver_life_cycle" to not rely on timing [SPARK-23197][STREAMING][TESTS] Fix ReceiverSuite."receiver_life_cycle" to not rely on timing Sep 20, 2019
@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111033 has finished for PR 25862 at commit 4ff9f7e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class MethodsCallRecorder

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111035 has finished for PR 25862 at commit b4462dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111039 has finished for PR 25862 at commit f70b3c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

executor.callsRecorder.reset()
receiver.callsRecorder.reset()
receiver.restart("restarting", null, 100)
eventually(timeout(10.seconds), interval(10.milliseconds)) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, 10.seconds is enough? Or, do you need to re-trigger this PR to validate more?
BTW, thank you so much for taking care of this case! This is really an long standing issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that was actually 1.3 seconds (300ms + 1s) and it hasn't been failing for high probability so it should be pretty enough.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 20, 2019

cc. @tdas @sameeragarwal as they've authored and reviewed 15adcc8
also cc. to @zsxwing @jose-torres @gaborgsomogyi

assert(executor.isReceiverStarted)
executor.callsRecorder.reset()
receiver.callsRecorder.reset()
receiver.restart("restarting", null, 100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes down from 600 to 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes we no longer need so long delay as we don't rely on timing.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified this PR with the given test method. New test suite is logically robust and doesn't fail.
+1, LGTM. Merged to master.

Thank you so much, @HeartSaVioR !

@dongjoon-hyun
Copy link
Member

Could you make a backporting PR, @HeartSaVioR ?

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging, @dongjoon-hyun ! I'll submit a PR for branch-2.4 as well.

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Sep 25, 2019
…e" to not rely on timing

This patch changes ReceiverSuite."receiver_life_cycle" to record actual calls with timestamp in FakeReceiver/FakeReceiverSupervisor, which doesn't rely on timing of stopping and starting receiver in restarting receiver. It enables us to give enough huge timeout on verification of restart as we can verify both stopping and starting together.

The test is flaky without this patch. We increased timeout to fix flakyness of this test (apache@15adcc8) but even with longer timeout it has been still failing intermittently.

No

I've reproduced test failure artificially via below diff:

```
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index faf6db8..d8977543c0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 -191,9 +191,11  private[streaming] abstract class ReceiverSupervisor(
       // thread pool.
       logWarning("Restarting receiver with delay " + delay + " ms: " + message,
         error.getOrElse(null))
+      Thread.sleep(1000)
       stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
       logDebug("Sleeping for " + delay)
       Thread.sleep(delay)
+      Thread.sleep(1000)
       logInfo("Starting receiver again")
       startReceiver()
       logInfo("Receiver started again")
```

and confirmed this patch doesn't fail with the change.

Closes apache#25862 from HeartSaVioR/SPARK-23197-v2.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@HeartSaVioR
Copy link
Contributor Author

#25930

@HeartSaVioR HeartSaVioR deleted the SPARK-23197-v2 branch September 25, 2019 20:35
@dongjoon-hyun
Copy link
Member

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants