Skip to content

Commit 1fed2bc

Browse files
author
Marcelo Vanzin
committed
[SPARK-3446] Expose underlying job ids in FutureAction.
FutureAction is the only type exposed through the async APIs, so for job IDs to be useful they need to be exposed there. The complication is that some async jobs run more than one job (e.g. takeAsync), so the exposed ID has to actually be a list of IDs that can actually change over time. So the interface doesn't look very nice, but... Change is actually small, I just added a basic test to make sure it works.
1 parent 2686233 commit 1fed2bc

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
8383
*/
8484
@throws(classOf[Exception])
8585
def get(): T = Await.result(this, Duration.Inf)
86+
87+
/**
88+
* Returns the job IDs run by the underlying async operation.
89+
*
90+
* This returns the current snapshot of the job list. Certain operations may run multiple
91+
* job, so multiple calls to this method may return different lists.
92+
*/
93+
def jobIds: Seq[Int]
94+
8695
}
8796

8897

@@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
150159
}
151160
}
152161

153-
/** Get the corresponding job id for this action. */
154-
def jobId = jobWaiter.jobId
162+
def jobIds = Seq(jobWaiter.jobId)
155163
}
156164

157165

@@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
171179
// is cancelled before the action was even run (and thus we have no thread to interrupt).
172180
@volatile private var _cancelled: Boolean = false
173181

182+
@volatile private var jobs: Seq[Int] = Nil
183+
174184
// A promise used to signal the future.
175185
private val p = promise[T]()
176186

@@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
219229
}
220230
}
221231

232+
this.jobs = jobs ++ job.jobIds
233+
222234
// Wait for the job to complete. If the action is cancelled (with an interrupt),
223235
// cancel the job and stop the execution. This is not in a synchronized block because
224236
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
@@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
255267
override def isCompleted: Boolean = p.isCompleted
256268

257269
override def value: Option[Try[T]] = p.future.value
270+
271+
def jobIds = jobs
272+
258273
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import scala.concurrent.Await
21+
import scala.concurrent.duration.Duration
22+
23+
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
24+
25+
import org.apache.spark.SparkContext._
26+
27+
class FutureActionSuite extends FunSuite with BeforeAndAfter with Matchers with LocalSparkContext {
28+
29+
before {
30+
sc = new SparkContext("local", "FutureActionSuite")
31+
}
32+
33+
test("simple async action") {
34+
val rdd = sc.parallelize(1 to 10, 2)
35+
val job = rdd.countAsync()
36+
val res = Await.result(job, Duration.Inf)
37+
res should be (10)
38+
job.jobIds.size should be (1)
39+
}
40+
41+
test("complex async action") {
42+
val rdd = sc.parallelize(1 to 15, 3)
43+
val job = rdd.takeAsync(10)
44+
val res = Await.result(job, Duration.Inf)
45+
res should be (1 to 10)
46+
job.jobIds.size should be (2)
47+
}
48+
49+
}

0 commit comments

Comments
 (0)