@@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
147147 }.start()
148148 }
149149 sem.acquire(2 )
150+ throwable.foreach { t => throw t }
150151 if (ThreadingSuiteState .failed.get()) {
151152 logError(" Waited 1 second without seeing runningThreads = 4 (it was " +
152153 ThreadingSuiteState .runningThreads.get() + " ); failing test" )
153154 fail(" One or more threads didn't see runningThreads = 4" )
154155 }
155- throwable.foreach { t => throw t }
156156 }
157157
158158 test(" set local properties in different thread" ) {
@@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
178178 threads.foreach(_.start())
179179
180180 sem.acquire(5 )
181- assert(sc.getLocalProperty(" test" ) === null )
182181 throwable.foreach { t => throw t }
182+ assert(sc.getLocalProperty(" test" ) === null )
183183 }
184184
185185 test(" set and get local properties in parent-children thread" ) {
@@ -207,15 +207,16 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
207207 threads.foreach(_.start())
208208
209209 sem.acquire(5 )
210+ throwable.foreach { t => throw t }
210211 assert(sc.getLocalProperty(" test" ) === " parent" )
211212 assert(sc.getLocalProperty(" Foo" ) === null )
212- throwable.foreach { t => throw t }
213213 }
214214
215215 test(" mutations to local properties should not affect submitted jobs (SPARK-6629)" ) {
216216 val jobStarted = new Semaphore (0 )
217217 val jobEnded = new Semaphore (0 )
218218 @ volatile var jobResult : JobResult = null
219+ var throwable : Option [Throwable ] = None
219220
220221 sc = new SparkContext (" local" , " test" )
221222 sc.setJobGroup(" originalJobGroupId" , " description" )
@@ -232,14 +233,19 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
232233 // Create a new thread which will inherit the current thread's properties
233234 val thread = new Thread () {
234235 override def run (): Unit = {
235- assert(sc.getLocalProperty(SparkContext .SPARK_JOB_GROUP_ID ) === " originalJobGroupId" )
236- // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
237236 try {
238- sc.parallelize(1 to 100 ).foreach { x =>
239- Thread .sleep(100 )
237+ assert(sc.getLocalProperty(SparkContext .SPARK_JOB_GROUP_ID ) === " originalJobGroupId" )
238+ // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
239+ try {
240+ sc.parallelize(1 to 100 ).foreach { x =>
241+ Thread .sleep(100 )
242+ }
243+ } catch {
244+ case s : SparkException => // ignored so that we don't print noise in test logs
240245 }
241246 } catch {
242- case s : SparkException => // ignored so that we don't print noise in test logs
247+ case t : Throwable =>
248+ throwable = Some (t)
243249 }
244250 }
245251 }
@@ -252,6 +258,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
252258 // modification of the properties object should not affect the properties of running jobs
253259 sc.cancelJobGroup(" originalJobGroupId" )
254260 jobEnded.tryAcquire(10 , TimeUnit .SECONDS )
261+ throwable.foreach { t => throw t }
255262 assert(jobResult.isInstanceOf [JobFailed ])
256263 }
257264}
0 commit comments