Skip to content

Commit d74c6a1

Browse files
author
Andrew Or
committed
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fail the test
This commit ensures if an assertion fails within a thread, it will ultimately fail the test. Otherwise we end up potentially masking real bugs by not propagating assertion failures properly. Author: Andrew Or <[email protected]> Closes #8723 from andrewor14/fix-threading-suite.
1 parent c2af42b commit d74c6a1

File tree

1 file changed

+45
-23
lines changed

1 file changed

+45
-23
lines changed

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
119119
val nums = sc.parallelize(1 to 2, 2)
120120
val sem = new Semaphore(0)
121121
ThreadingSuiteState.clear()
122+
var throwable: Option[Throwable] = None
122123
for (i <- 0 until 2) {
123124
new Thread {
124125
override def run() {
125-
val ans = nums.map(number => {
126-
val running = ThreadingSuiteState.runningThreads
127-
running.getAndIncrement()
128-
val time = System.currentTimeMillis()
129-
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
130-
Thread.sleep(100)
131-
}
132-
if (running.get() != 4) {
133-
ThreadingSuiteState.failed.set(true)
134-
}
135-
number
136-
}).collect()
137-
assert(ans.toList === List(1, 2))
138-
sem.release()
126+
try {
127+
val ans = nums.map(number => {
128+
val running = ThreadingSuiteState.runningThreads
129+
running.getAndIncrement()
130+
val time = System.currentTimeMillis()
131+
while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
132+
Thread.sleep(100)
133+
}
134+
if (running.get() != 4) {
135+
ThreadingSuiteState.failed.set(true)
136+
}
137+
number
138+
}).collect()
139+
assert(ans.toList === List(1, 2))
140+
} catch {
141+
case t: Throwable =>
142+
throwable = Some(t)
143+
} finally {
144+
sem.release()
145+
}
139146
}
140147
}.start()
141148
}
@@ -145,18 +152,25 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
145152
ThreadingSuiteState.runningThreads.get() + "); failing test")
146153
fail("One or more threads didn't see runningThreads = 4")
147154
}
155+
throwable.foreach { t => throw t }
148156
}
149157

150158
test("set local properties in different thread") {
151159
sc = new SparkContext("local", "test")
152160
val sem = new Semaphore(0)
153-
161+
var throwable: Option[Throwable] = None
154162
val threads = (1 to 5).map { i =>
155163
new Thread() {
156164
override def run() {
157-
sc.setLocalProperty("test", i.toString)
158-
assert(sc.getLocalProperty("test") === i.toString)
159-
sem.release()
165+
try {
166+
sc.setLocalProperty("test", i.toString)
167+
assert(sc.getLocalProperty("test") === i.toString)
168+
} catch {
169+
case t: Throwable =>
170+
throwable = Some(t)
171+
} finally {
172+
sem.release()
173+
}
160174
}
161175
}
162176
}
@@ -165,20 +179,27 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
165179

166180
sem.acquire(5)
167181
assert(sc.getLocalProperty("test") === null)
182+
throwable.foreach { t => throw t }
168183
}
169184

170185
test("set and get local properties in parent-children thread") {
171186
sc = new SparkContext("local", "test")
172187
sc.setLocalProperty("test", "parent")
173188
val sem = new Semaphore(0)
174-
189+
var throwable: Option[Throwable] = None
175190
val threads = (1 to 5).map { i =>
176191
new Thread() {
177192
override def run() {
178-
assert(sc.getLocalProperty("test") === "parent")
179-
sc.setLocalProperty("test", i.toString)
180-
assert(sc.getLocalProperty("test") === i.toString)
181-
sem.release()
193+
try {
194+
assert(sc.getLocalProperty("test") === "parent")
195+
sc.setLocalProperty("test", i.toString)
196+
assert(sc.getLocalProperty("test") === i.toString)
197+
} catch {
198+
case t: Throwable =>
199+
throwable = Some(t)
200+
} finally {
201+
sem.release()
202+
}
182203
}
183204
}
184205
}
@@ -188,6 +209,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
188209
sem.acquire(5)
189210
assert(sc.getLocalProperty("test") === "parent")
190211
assert(sc.getLocalProperty("Foo") === null)
212+
throwable.foreach { t => throw t }
191213
}
192214

193215
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {

0 commit comments

Comments
 (0)