Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 6, 2019

What changes were proposed in this pull request?

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:

- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)

which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

Why are the changes needed?

UT fails intermittently and this patch will address the flakyness.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

Screen Shot 2019-09-07 at 7 44 15 AM

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 6, 2019

I'm also seeing inconsistency among this test suite how to verify failedStages. Some places access it directly, while other places access it via scheduler.failedStages. Some places convert it to Set, some other places use contains and length separately, some other places directly compare with Seq. Ideally it seems to be better to deal with this as well, but not sure we would like to deal with this here or another minor PR.

EDIT: failedStages and scheduler.failedStages are different references. My bad.

@HeartSaVioR
Copy link
Contributor Author

Maybe even better to extract the logic about "ensuring listener has no event to process" and "access failedStages" into method and always call this. It should help us not missing to wait for listener thread.

@HeartSaVioR HeartSaVioR changed the title [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking failedStages [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values Sep 6, 2019
@HeartSaVioR
Copy link
Contributor Author

There're so many authors in the file, but let me cc. to couple of committers who authored related code or reported this flakyness issue.

cc. @jiangxb1987 @vanzin @tgravescs @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110209 has finished for PR 25706 at commit 7808a01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110212 has finished for PR 25706 at commit ea3bc10.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EventInfoRecordingListener extends SparkListener

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed it, but was there any test that was actually missing a waitUntilEmpty call? All the ones I noticed that needed it had it, and you've just moved it to shared code.

That isn't bad, but it also makes me question whether you're actually fixing the bug.

As with most timing bugs, you could potentially reproduce it by adding a sleep somewhere (e.g. in the listener that should process the event), and the test should pass even with the sleep in place.

_endedTasks.clear()
}

private def withWaitingListenerUntilEmpty[T](fn: => T): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a weird method because fn doesn't need to run in the context of this method, just after all the events have been processes. So in the getters you could just:

    def endedTask: Set[Long] = {
      waitForListeners()
      _endedTasks.toSet
    }

   def waitForListenerts(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

That makes each getter a little longer but looks less weird.

_stageByOrderOfExecution.toList
}

def endedTask: Set[Long] = withWaitingListenerUntilEmpty {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endedTasks

}

private def withWaitingListenerUntilEmpty[T](fn: => T): T = {
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a side note, the timeout for this method is hardcoded to a bunch of different arbitrary values in so many different places, that it may be good at some point to just have a default value in LiveListenerBus. I doubt any test code actually depends on a specific timeout here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're suggesting to have non-param version of waitUntilEmpty with proper default timeout value. Sounds great - I'll deal with another PR as this PR is intended to fix the test flakiness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_endedTasks.toSet
}

def clear(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this you could just instantiate a new listener for each test.

null))

assert(failure == null, "job should not fail")
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about this one. What is it waiting on? runEvent (and failedStages) is unrelated to the listener bus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh my bad I thought I removed it but missed it. Will remove.

TaskKilled("test"),
null))
assert(failedStages === Seq(0))
assert(sparkListener.failedStages === Seq(0))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actual place of bug. The code is newly added (for "newly" I meant after the pattern of sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) is applied) and doesn't follow the existing pattern. That is easily missed and here we can enforce it by disallowing access variables directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're some other places accessing failedStages without waiting. Search with assert(failedStages === Seq(0))

_endedTasks += taskEnd.taskInfo.taskId
}

def submittedStageInfos: Set[StageInfo] = withWaitingListenerUntilEmpty {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move this out of the EventInfoRecordingListener class ?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get it. Could you please elaborate? The recorded values are in listener and these methods expose them in safe way.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand correctly, but if you're suggesting withWaitingListenerUntilEmpty out of EventInfoRecordingListener class, I guess it makes sense, though in another review comment I got feedback to unroll this, so maybe I'm going to remove this. Please let me know if we still want to have this for other places of waiting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, thank you !

@HeartSaVioR
Copy link
Contributor Author

Just commented where the waiting is missed.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 6, 2019

Screen Shot 2019-09-07 at 7 44 15 AM

Forgot to thank you for providing brilliant idea to artificially fail the problematic tests. I've just applied 50ms of sleep on each onXXX method, and I've found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and they don't deal with these values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Also updated to the description of PR as well.

@SparkQA
Copy link

SparkQA commented Sep 7, 2019

Test build #110266 has finished for PR 25706 at commit 3d999e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@dongjoon-hyun
Copy link
Member

For me, this PR looks fine. Could you review this once more, @vanzin and @jiangxb1987 ? Thanks~

If there is no further comments, I'm going to merge this tomorrow.

@SparkQA
Copy link

SparkQA commented Sep 11, 2019

Test build #110442 has finished for PR 25706 at commit 3d999e9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 11, 2019

Test build #110447 has finished for PR 25706 at commit 3d999e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Sep 11, 2019

Merging to master.

@vanzin vanzin closed this in 2736efa Sep 11, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-26989 branch September 11, 2019 20:45
@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR , @vanzin , @jiangxb1987 , @HyukjinKwon .

As @HeartSaVioR pointed here, the root cause is introduced at 2.4.0 via e3486e1#diff-f3b410b16818d8f34bb1eb4120a60d51R1102 .

Since branch-2.4 is our LTS branch, can we have this in branch-2.4?
@HeartSaVioR . Could you make a backporting PR against branch-2.4? I guess that will be a reduced backport PR compared with this.

PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
…ly processed before checking recorded values

### What changes were proposed in this pull request?

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

### Why are the changes needed?

UT fails intermittently and this patch will address the flakyness.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Closes apache#25706 from HeartSaVioR/SPARK-26989.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Sep 15, 2019
…ly processed before checking recorded values

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

UT fails intermittently and this patch will address the flakyness.

No

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Closes apache#25706 from HeartSaVioR/SPARK-26989.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
@HeartSaVioR
Copy link
Contributor Author

#25794 for branch-2.4

dongjoon-hyun pushed a commit that referenced this pull request Sep 20, 2019
…ntilEmpty

### What changes were proposed in this pull request?

This is a follow-up of the [review comment](#25706 (comment)).

This patch unifies the default wait time to be 10 seconds as it would fit most of UTs (as they have smaller timeouts) and doesn't bring additional latency since it will return if the condition is met.

This patch doesn't touch the one which waits 100000 milliseconds (100 seconds), to not break anything unintentionally, though I'd rather questionable that we really need to wait for 100 seconds.

### Why are the changes needed?

It simplifies the test code and get rid of various heuristic values on timeout.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

CI build will test the patch, as it would be the best environment to test the patch (builds are running there).

Closes #25837 from HeartSaVioR/MINOR-unify-default-wait-time-for-wait-until-empty.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants