Skip to content

Conversation

@ted-yu
Copy link

@ted-yu ted-yu commented Nov 8, 2015

As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22

Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages SparkListenerExecutorMetricsUpdate from the DAGScheduler

@ted-yu ted-yu changed the title Exit AsynchronousListenerBus thread when stop() is called [SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called Nov 8, 2015
@SparkQA
Copy link

SparkQA commented Nov 8, 2015

Test build #45302 has finished for PR 9546 at commit c60b860.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vonnagy
Copy link

vonnagy commented Nov 8, 2015

@ted-yu I have found that the call to listenerThread.join (line 168 in class AsynchronousListenerBus) sometimes still locks up. I have changed the code slightly to use something like listenerThread.join(20) (line 168) so that if the thread does not exit it will give it a quick break before trying again.

Would you mind adding something like that to your PR?

@vonnagy
Copy link

vonnagy commented Nov 9, 2015

I was unable to duplicate the issue I had with the listenerThread.join so it seems that your change solved the issue.

If I encounter the issue again then I can create a PR to adjust the listenerThread.join() accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing this here, can you simplify the try block as follows?

try {
  if (stopped.get()) {
    // Get out of the while loop and shutdown the daemon thread
    return
  }
  val event = eventQueue.poll()
  assert(event != null, "event queue was empty but the listener bus was not stopped")
  postToAll(event)
}

I believe this will also fix the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this into the try? If stopped.get throws an exception we still want to set processingEvent to false

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45460 has finished for PR 9546 at commit 9a6f9ff.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Nov 10, 2015

I see several errors in the following form (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45460/consoleFull):

[error] SERVER ERROR: Service Temporarily Unavailable url=http://repository.mapr.com/maven/org/apache/hadoop/hadoop-yarn-server/2.2.0/hadoop-yarn-server-2.2.0.jar

Not related to the PR.

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45464 has finished for PR 9546 at commit 9acce70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ted-yu
Copy link
Author

ted-yu commented Nov 10, 2015

@andrewor14
See if all comments have been addressed

asfgit pushed a commit that referenced this pull request Nov 11, 2015
As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22

Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler

Author: tedyu <[email protected]>

Closes #9546 from ted-yu/master.

(cherry picked from commit 3e0a6cf)
Signed-off-by: Andrew Or <[email protected]>
@asfgit asfgit closed this in 3e0a6cf Nov 11, 2015
@JoshRosen
Copy link
Contributor

I think that this has caused the "org.apache.spark.scheduler.EventLoggingListenerSuite.End-to-end event logging" test to become flaky in Jenkins. For example: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/4014/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.3,label=spark-test/testReport/junit/org.apache.spark.scheduler/EventLoggingListenerSuite/End_to_end_event_logging/

I believe that this patch may have changed the behavior of the listener bus during shutdown. According to the stop() method's Scaladoc:

  /**
   * Stop the listener bus. It will wait until the queued events have been processed, but drop the
   * new events after stopping.
   */

It looks like this patch just changes things so that we halt immediately once the stopped flag has been set rather than waiting for the queue to drain.

@ted-yu
Copy link
Author

ted-yu commented Nov 15, 2015

Planning to send out a PR to fix the regression by keeping count of queued events first time seeing the stop flag.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants