Skip to content

Commit b6e9986

Browse files
author
Andrew Or
committed
[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions
*Note: this is for master branch only.* The fix for branch-1.5 is at #8721. The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to `IllegalArgumentException: spark.sql.execution.id is already set` when running queries in parallel, e.g.: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` The cause is `SparkContext`'s local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path. Author: Andrew Or <[email protected]> Closes #8710 from andrewor14/concurrent-sql-executions.
1 parent be52faa commit b6e9986

File tree

3 files changed

+132
-43
lines changed

3 files changed

+132
-43
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import scala.collection.mutable.HashMap
3333
import scala.reflect.{ClassTag, classTag}
3434
import scala.util.control.NonFatal
3535

36+
import org.apache.commons.lang.SerializationUtils
3637
import org.apache.hadoop.conf.Configuration
3738
import org.apache.hadoop.fs.Path
3839
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -347,8 +348,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
347348
private[spark] var checkpointDir: Option[String] = None
348349

349350
// Thread Local variable that can be used by users to pass information down the stack
350-
private val localProperties = new InheritableThreadLocal[Properties] {
351-
override protected def childValue(parent: Properties): Properties = new Properties(parent)
351+
protected[spark] val localProperties = new InheritableThreadLocal[Properties] {
352+
override protected def childValue(parent: Properties): Properties = {
353+
// Note: make a clone such that changes in the parent properties aren't reflected in
354+
// the those of the children threads, which has confusing semantics (SPARK-10563).
355+
SerializationUtils.clone(parent).asInstanceOf[Properties]
356+
}
352357
override protected def initialValue(): Properties = new Properties()
353358
}
354359

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
147147
}.start()
148148
}
149149
sem.acquire(2)
150-
throwable.foreach { t => throw t }
150+
throwable.foreach { t => throw improveStackTrace(t) }
151151
if (ThreadingSuiteState.failed.get()) {
152152
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
153153
ThreadingSuiteState.runningThreads.get() + "); failing test")
@@ -178,7 +178,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
178178
threads.foreach(_.start())
179179

180180
sem.acquire(5)
181-
throwable.foreach { t => throw t }
181+
throwable.foreach { t => throw improveStackTrace(t) }
182182
assert(sc.getLocalProperty("test") === null)
183183
}
184184

@@ -207,58 +207,41 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
207207
threads.foreach(_.start())
208208

209209
sem.acquire(5)
210-
throwable.foreach { t => throw t }
210+
throwable.foreach { t => throw improveStackTrace(t) }
211211
assert(sc.getLocalProperty("test") === "parent")
212212
assert(sc.getLocalProperty("Foo") === null)
213213
}
214214

215-
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
216-
val jobStarted = new Semaphore(0)
217-
val jobEnded = new Semaphore(0)
218-
@volatile var jobResult: JobResult = null
219-
var throwable: Option[Throwable] = None
220-
215+
test("mutation in parent local property does not affect child (SPARK-10563)") {
221216
sc = new SparkContext("local", "test")
222-
sc.setJobGroup("originalJobGroupId", "description")
223-
sc.addSparkListener(new SparkListener {
224-
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
225-
jobStarted.release()
226-
}
227-
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
228-
jobResult = jobEnd.jobResult
229-
jobEnded.release()
230-
}
231-
})
232-
233-
// Create a new thread which will inherit the current thread's properties
234-
val thread = new Thread() {
217+
val originalTestValue: String = "original-value"
218+
var threadTestValue: String = null
219+
sc.setLocalProperty("test", originalTestValue)
220+
var throwable: Option[Throwable] = None
221+
val thread = new Thread {
235222
override def run(): Unit = {
236223
try {
237-
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
238-
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
239-
try {
240-
sc.parallelize(1 to 100).foreach { x =>
241-
Thread.sleep(100)
242-
}
243-
} catch {
244-
case s: SparkException => // ignored so that we don't print noise in test logs
245-
}
224+
threadTestValue = sc.getLocalProperty("test")
246225
} catch {
247226
case t: Throwable =>
248227
throwable = Some(t)
249228
}
250229
}
251230
}
231+
sc.setLocalProperty("test", "this-should-not-be-inherited")
252232
thread.start()
253-
// Wait for the job to start, then mutate the original properties, which should have been
254-
// inherited by the running job but hopefully defensively copied or snapshotted:
255-
jobStarted.tryAcquire(10, TimeUnit.SECONDS)
256-
sc.setJobGroup("modifiedJobGroupId", "description")
257-
// Canceling the original job group should cancel the running job. In other words, the
258-
// modification of the properties object should not affect the properties of running jobs
259-
sc.cancelJobGroup("originalJobGroupId")
260-
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
261-
throwable.foreach { t => throw t }
262-
assert(jobResult.isInstanceOf[JobFailed])
233+
thread.join()
234+
throwable.foreach { t => throw improveStackTrace(t) }
235+
assert(threadTestValue === originalTestValue)
263236
}
237+
238+
/**
239+
* Improve the stack trace of an error thrown from within a thread.
240+
* Otherwise it's difficult to tell which line in the test the error came from.
241+
*/
242+
private def improveStackTrace(t: Throwable): Throwable = {
243+
t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace)
244+
t
245+
}
246+
264247
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import java.util.Properties
21+
22+
import scala.collection.parallel.CompositeThrowable
23+
24+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
25+
import org.apache.spark.sql.SQLContext
26+
27+
class SQLExecutionSuite extends SparkFunSuite {
28+
29+
test("concurrent query execution (SPARK-10548)") {
30+
// Try to reproduce the issue with the old SparkContext
31+
val conf = new SparkConf()
32+
.setMaster("local[*]")
33+
.setAppName("test")
34+
val badSparkContext = new BadSparkContext(conf)
35+
try {
36+
testConcurrentQueryExecution(badSparkContext)
37+
fail("unable to reproduce SPARK-10548")
38+
} catch {
39+
case e: IllegalArgumentException =>
40+
assert(e.getMessage.contains(SQLExecution.EXECUTION_ID_KEY))
41+
} finally {
42+
badSparkContext.stop()
43+
}
44+
45+
// Verify that the issue is fixed with the latest SparkContext
46+
val goodSparkContext = new SparkContext(conf)
47+
try {
48+
testConcurrentQueryExecution(goodSparkContext)
49+
} finally {
50+
goodSparkContext.stop()
51+
}
52+
}
53+
54+
/**
55+
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
56+
*/
57+
private def testConcurrentQueryExecution(sc: SparkContext): Unit = {
58+
val sqlContext = new SQLContext(sc)
59+
import sqlContext.implicits._
60+
61+
// Initialize local properties. This is necessary for the test to pass.
62+
sc.getLocalProperties
63+
64+
// Set up a thread that runs executes a simple SQL query.
65+
// Before starting the thread, mutate the execution ID in the parent.
66+
// The child thread should not see the effect of this change.
67+
var throwable: Option[Throwable] = None
68+
val child = new Thread {
69+
override def run(): Unit = {
70+
try {
71+
sc.parallelize(1 to 100).map { i => (i, i) }.toDF("a", "b").collect()
72+
} catch {
73+
case t: Throwable =>
74+
throwable = Some(t)
75+
}
76+
77+
}
78+
}
79+
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "anything")
80+
child.start()
81+
child.join()
82+
83+
// The throwable is thrown from the child thread so it doesn't have a helpful stack trace
84+
throwable.foreach { t =>
85+
t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace)
86+
throw t
87+
}
88+
}
89+
90+
}
91+
92+
/**
93+
* A bad [[SparkContext]] that does not clone the inheritable thread local properties
94+
* when passing them to children threads.
95+
*/
96+
private class BadSparkContext(conf: SparkConf) extends SparkContext(conf) {
97+
protected[spark] override val localProperties = new InheritableThreadLocal[Properties] {
98+
override protected def childValue(parent: Properties): Properties = new Properties(parent)
99+
override protected def initialValue(): Properties = new Properties()
100+
}
101+
}

0 commit comments

Comments
 (0)