Skip to content

Conversation

@JoshRosen
Copy link
Contributor

Background

The AsyncRDDActions methods were introduced in e33b183, the first pull request to add support for job cancelation. A follow pull request, 599dcb0, added cancelation on a per-job-group basis. Quoting from that PR:

This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.

An example:

    sc.setJobDescription("this_is_the_group_id", "some job description")
    sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()

In a separate thread:

    sc.cancelJobGroup("this_is_the_group_id")

In its current form, AsyncRDDActions seems to serve no purpose other than to enable job cancelation. AsyncRDDActions is marked as @Experimental, so users may be reluctant to depend on it. If we add new actions, then we also have to add asynchronous versions of those actions, creating a maintenance burden.

Proposal

I propose that we remove AsyncRDDActions and use job groups as the only user-facing API for job cancelation. To make job groups more convenient for users, this pull request adds a runAsync method to SparkContext that makes it easy to run an asynchronous computation in a particular Spark job group. For example:

// Instead of countAsync(), we call the regular actions from a runAsync block:
val futureCount: RunAsyncResult[Long] = sc.runAsync {
   sc.parallelize(...).map(...).count()
}

// This returns a Future:
futureCount.onSuccess(c => println(s"Got count $c!"))

// The future also supports cancellation
futureCount.cancel()

// This also works with blocks that call multiple actions (e.g. an iterative ML algorithm)
val futureResult = sc.runAsync {
   val rdd = sc.parallelize(...)
   val count = rdd.count()
   val first = rdd.first()
   first  // this is the result of the block
} 

I refactored JobCancellationSuite to use this new API instead of AsyncRDDActions; see that file for more examples.

TODOS / tasks to finish before merging:

This is marked as [WIP] since there are still a number of tasks that need to be finished before this is merge-worthy:

  • Add a Java interface for this; it can probably be an extension of Runnable.
  • Extend RunAsyncResult to expose the job ids of jobs launched from its computation (see SPARK-3446 / [SPARK-3446] Expose underlying job ids in FutureAction. #2337).
  • The thread-safety concerns here are somewhat complicated; add more comments.
  • Explain the caveats with job-group-based cancellation:
    • Having multiple threads submitting jobs in the same job group may lead to confusing behavior.
    • Recursive runAsync() calls may have undefined / confusing behavior.
  • Look through AsyncRDDActionsSuite to see whether there are any tests that need to be re-added for these APIs.
  • Add example uses in the Spark examples subproject, since it may not be obvious how to use this.
  • Fix the tests in CancellationSuite to fail for more subtly-incorrect cases. While working on this PR, I found a few cases where this test suite could still pass despite cancellation being broken. I need to extend this suite to ensure that the jobs that are cancelled never actually complete and that jobs scheduled after cancellations launch in a short period of time. If the cancellation doesn't work, then the current test suite will run very slowly but still pass.
  • Currently we run computations in new threads; should we use execution contexts for this? How does this interact with thread locals? We don't want to accidentally leave thread locals set on threads that will be re-used, since this could lead to the wrong job groups being set.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have started for PR 2482 at commit c715511.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have finished for PR 2482 at commit c715511.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Sep 21, 2014

I don't think we can just wipe the old one out. At the very least, we need to "deprecate" it. Even that is debatable because some applications might prefer this async model.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have started for PR 2482 at commit 3171939.

  • This patch merges cleanly.

@markhamstra
Copy link
Contributor

+1 @rxin

Just scanned through the code quickly, and I didn't immediately see anything that would preclude retaining and deprecating the old code while introducing the new (which looks pretty good!)

@JoshRosen
Copy link
Contributor Author

Fair enough, although the AsyncRDDActions class was marked as @Experimental and the documentation for that annotation explicitly warns that experimental APIs might change or be removed even in minor releases:

/**
 * An experimental user-facing API.
 *
 * Experimental API's might change or be removed in minor versions of Spark, or be adopted as
 * first-class Spark API's.
 *

On the other hand, the individual methods weren't marked as @Experimental and we did provide an implicit conversion, so it's possible that users might have started relying on these APIs without realizing that they were experimental.

@markhamstra
Copy link
Contributor

Yes, I know that they are now Experimental, but they weren't always so, since we didn't have the Experimental designation/policy when AsyncRDDActions was introduced. And even though we can remove the old code without violating policy, we should recognize that any user code that is using the old code is likely to require non-trivial changes to confidently move to the new style code. Deprecation makes sense even if it's not strictly required.

@JoshRosen
Copy link
Contributor Author

@rxin Do you have any examples of why a user might prefer the old model, besides backwards-compatibility? I'd like to understand if the old model (in its current form) provides any features that this proposal is missing (so that I can add them).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this is not a reliable (actually very unreliable) way of cancelling a thread.

this only stops the thread if it is waiting on io or sleeping. if the user thread is actually executing stuff (or busy looping), this doesn't do anything.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have finished for PR 2482 at commit 3171939.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen force-pushed the remove-asyncrddactions branch from 3171939 to 4882082 Compare September 21, 2014 22:16
@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have started for PR 2482 at commit 4882082.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor Author

I've taken another pass at this. This time, I kept AsyncRDDActions but re-implemented it using runAsync, but I'm actually on the fence about that change. The one difference here is that the asynchronous jobs will now be submitted with anonymous job groups rather than as part of the calling thread's job group. This change might be observable by a user who writes a job that fires off multiple asynchronous actions from a single driver control thread, then attempts to cancel that thread's job group. Because job groups don't have any hierarchy / nesting, this would break the cancellation of those jobs.

I'm beginning to get the sense that we might not have much room to change anything about the implementation of AsyncRDDActions, so maybe we should just let them be.

@rxin Based on our discussion, I added a check in DAGScheduler to reject jobs submitted by cancelled threads.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have finished for PR 2482 at commit 4882082.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

I'm going to close this for now. My approach has some confusing semantics and may be more general than what most users need.

@JoshRosen JoshRosen closed this Oct 2, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants