Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Sep 9, 2014

FutureAction is the only type exposed through the async APIs, so
for job IDs to be useful they need to be exposed there. The complication
is that some async jobs run more than one job (e.g. takeAsync),
so the exposed ID has to actually be a list of IDs that can actually
change over time. So the interface doesn't look very nice, but...

Change is actually small, I just added a basic test to make sure
it works.

FutureAction is the only type exposed through the async APIs, so
for job IDs to be useful they need to be exposed there. The complication
is that some async jobs run more than one job (e.g. takeAsync),
so the exposed ID has to actually be a list of IDs that can actually
change over time. So the interface doesn't look very nice, but...

Change is actually small, I just added a basic test to make sure
it works.
@markhamstra
Copy link
Contributor

I don't understand this claim: "...for job IDs to be useful they need to be exposed there." Could you clarify, please?

@vanzin
Copy link
Contributor Author

vanzin commented Sep 9, 2014

The point of adding the "jobId" method to SimpleFutureAction was so that code calling these async methods knew the IDs of the jobs they were triggering (see SPARK-2636). Except the job ID is not really exposed at all since SimpleFutureAction is not exposed through the async APIs.

(Sure you could cast the result, but that's ugly, and that also does not cover ComplexFutureAction.)

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have started for PR 2337 at commit 1fed2bc.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have finished for PR 2337 at commit 1fed2bc.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Sep 18, 2014

Ping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple jobs

@pwendell
Copy link
Contributor

It would be good to test the complex case with multiple job ids, but overall looks good. @rxin you added this interface - can you take a look (this is a very small patch)?

@SparkQA
Copy link

SparkQA commented Sep 19, 2014

QA tests have started for PR 2337 at commit e166a68.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 19, 2014

QA tests have finished for PR 2337 at commit e166a68.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Sep 20, 2014

The API is slightly awkward as you suggested. Is this intended to get job progress? If yes, maybe we can do that through the "job group" to get the list of job ids?

@JoshRosen
Copy link
Contributor

My initial thought was that a "job group"-based approach might be a bit cleaner, but there are a few subtleties with that proposal that we need to consider.

What if we had an API that accepts a "job group" and returns the ids of jobs in that group? e.g.

// in SparkContext:
def getJobsForGroup(jobGroup: String): Seq[Int]

What should we return here? The list of active jobs? All jobs, including failed and completed ones?

It looks like this pull request addresses a case where you run some asynchronous action and want to retrieve the ids of all jobs associated with that action. If getJobsForGroup returned all jobs, then it would be the caller's responsibility to "diff" the results before and after invoking the action to figure out which jobs were created as a result of that action. This sort of works if you only have one thread launching jobs in that job group; if you had multiple threads, then you wouldn't be able to separate the jobs added by each thread (short of using a unique job group per-thread). There might be use-cases where I want to fire off a bunch of asynchronous actions (using these async. APIs) then periodically poll the status of all my FutureActions and it seems like this could get really messy if I had to switch the job group before launching each action.

Another subtle problem with this getJobsForGroup method: eventually, we'll have to garbage-collect entries from the jobGroup to jobIds mapping, which could lead to weird results when diffing two return values.

@JoshRosen
Copy link
Contributor

@rxin @pwendell Since we have job groups and the ability to cancel all jobs running in a job group (sc.cancelJobGroup()), then why do we need FutureAction? It looks like the only benefit that it offers over regular Scala Future is cancellation.

I imagine that many developers would like to be able to fire off an entire workflow, potentially comprising multiple actions, monitor its overall progress, and cancel the whole thing. It seems like job groups offer a strictly more powerful set of features that allows users to perform progress-monitoring and cancellation on entire workflows, not just individual actions.

If the motivation for FutureAction is that job groups are inconvenient for simple things, then I think we can address that by adding convenience wrappers that act like Python context managers and make it easy to run a block of code inside of a particular job group. Or, we could add an API that executes an arbitrary user-defined code block using a specified job group and returns a cancelable future.

@pwendell
Copy link
Contributor

@vanzin it would be helpful to hear what the needs are for Hive on Spark. Other applications I've seen have been using the job group for this purpose. And this will actually work even if a query involves multiple jobs (which using this Future interface would make that much harder). It would work such that at the beginning of each query you set the job group before calling any Spark actions. Then in another thread you can read the job ids associated with the group for progress tracking.

@JoshRosen
Copy link
Contributor

I've opened #2482 , a pull request (WIP) illustrating my proposal to remove AsyncRDDActions and replace it with a more general mechanism for asynchronously launching and monitoring Spark jobs.

@vanzin
Copy link
Contributor Author

vanzin commented Sep 22, 2014

Lots of questions, let's go one by one.

Motivation

This is discussed in SPARK-2636 (and probably a coupe of others), but I'll try to summarize it quickly here. Hive-on-Spark generates multiple jobs for a single query, and needs to monitor and collect metrics for each of those jobs - separately. The way to do this in Spark is through the use of SparkListener. But the missing piece is that when you call an action such as collect() or saveAsHadoopFile(), that does not return a job ID in any way. So HoS was using the async API, since that was the recommended workaround, and the fix for SPARK-2636 added the job's ID to the FutureAction API. The problem is that it did not expose the job IDs correctly, which is why I filed this bug and sent this PR.

Job Groups

I was not familiar with the API and it sounds great to me. It would make monitoring jobs in my remote API prototype (SPARK-3215) much cleaner. The only missing piece from looking at the API is that I don't see "job group" anywhere in the events sent to listeners. e.g.:

case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
  extends SparkListenerEvent

Unless properties contains the job group info somehow, which would look a little brittle to me but I can work with, that's something that would need to be fixed for HoS to be able to gather the information it needs.

Async API vs. Something Else

I'm not sold on using the async API, and in fact its use in my remote client prototype looks sort of hacky and ugly. But currently that's the only way to gather the information HoS needs. Any substitute needs to allow the caller to match events to the job that was submitted, which is not possible via other means today (or, at least, not that I can see).

I assume that job groups still work OK with the current async API, since the thread local data is using an InheritableThreadLocal.

@JoshRosen
Copy link
Contributor

Unless properties contains the job group info somehow.

It does, actually; the property is named SparkContext.SPARK_JOB_GROUP_ID. Since properties can technically be null, you could use something like

val jobGroupId = Option(properties).map(_.get(SparkContext.SPARK_JOB_GROUP_ID)).orNull)

to check the job group. This is kind of messy (this isn't a documented / stable API, though).

@vanzin
Copy link
Contributor Author

vanzin commented Sep 22, 2014

This is kind of messy (this isn't a documented / stable API, though).

More than that, it's private[spark], which means I have to hardcode the string in my code and hope it never changes...

@vanzin
Copy link
Contributor Author

vanzin commented Sep 22, 2014

Just to be clear, I'm ok with switching to using job groups to achieve what HoS needs (and close this PR/bug), but even that path seems like it could use some changes to make the lives of people using the API easier.

@JoshRosen
Copy link
Contributor

More than that, it's private[spark], which means I have to hardcode the string in my code and hope it never changes...

Yeah, I wasn't suggesting that as a substitute for a real public API.

Just to be clear, I'm ok with switching to using job groups to achieve what HoS needs (and close this PR/bug), but even that path seems like it could use some changes to make the lives of people using the API easier.

I think there are two separate design issues here:

  1. How do I get the jobId associated with a job that I've launched?
  2. Given that I know the jobId, how do I find out more information about the status of that job?

Let's keep this open for now, since this PR sounds like an okay way to address 1) and these two concerns are largely orthogonal.

@JoshRosen
Copy link
Contributor

I've given it some thought and I don't think that we should merge the more general async. mechanism that I described in #2482. It had some confusing semantics surrounding cancellation (see the discussion of Thread.interrupt) and was probably more general than what most users need.

Given that we should probably keep the current async APIs, this PR's change looks good. I'm going to merge this into master. Thanks for this commit and sorry for the long review delay!

@asfgit asfgit closed this in 29c3513 Oct 2, 2014
@vanzin
Copy link
Contributor Author

vanzin commented Oct 2, 2014

Thanks Josh. I meant to comment on your other PR (also about the weird cancellation semantics), but life got in the way. :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants