From 02157be0d38468db8fd231c9f8bc5d5f3ed2fd90 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 26 Aug 2022 14:33:04 -0700 Subject: [PATCH 1/4] Changes. --- .../org/apache/spark/executor/Executor.scala | 22 ++++++-- .../apache/spark/executor/ExecutorSuite.scala | 53 +++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d01de3b9ed08..53e39c6af2b0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -25,6 +25,7 @@ import java.nio.ByteBuffer import java.util.{Locale, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy import javax.ws.rs.core.UriBuilder @@ -85,6 +86,11 @@ private[spark] class Executor( private[executor] val conf = env.conf + // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword + // so that tasks can exit quickly if they are interrupted while waiting on another task to + // finish downloading dependencies. + private val updateDependenciesLock = new ReentrantLock() + // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -969,13 +975,19 @@ private[spark] class Executor( /** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. + * Visible for testing. */ - private def updateDependencies( + private[spark] def updateDependencies( newFiles: Map[String, Long], newJars: Map[String, Long], - newArchives: Map[String, Long]): Unit = { + newArchives: Map[String, Long], + testStartLatch: Option[CountDownLatch] = None, + testEndLatch: Option[CountDownLatch] = None): Unit = { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - synchronized { + updateDependenciesLock.lockInterruptibly() + try { + // For testing, so we can simulate a slow library install: + testStartLatch.foreach(_.countDown()) // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo(s"Fetching $name with timestamp $timestamp") @@ -1018,6 +1030,10 @@ private[spark] class Executor( } } } + // For testing, so we can simulate a slow library download: + testEndLatch.foreach(_.await()) + } finally { + updateDependenciesLock.unlock() } } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 14871efac5bc..35f2d0a07358 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -514,6 +514,59 @@ class ExecutorSuite extends SparkFunSuite } } + test("SPARK-40235: updateDependencies is interruptible when waiting on lock") { + val conf = new SparkConf + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + withExecutor("id", "localhost", env) { executor => + val startLatch = new CountDownLatch(1) + val endLatch = new CountDownLatch(1) + + // Start a thread to simulate a task that begins executing updateDependencies() + // and takes a long time to finish because library download or installation is slow: + val slowLibraryDownloadThread = new Thread(() => { + executor.updateDependencies( + Map.empty, + Map.empty, + Map.empty, + Some(startLatch), + Some(endLatch)) + }) + slowLibraryDownloadThread.start() + + // Wait for that thread to acquire the lock: + startLatch.await() + + // Start a second thread to simulate a task that blocks on the other task's + // library download: + val blockedLibraryDownloadThread = new Thread(() => { + executor.updateDependencies( + Map.empty, + Map.empty, + Map.empty) + }) + blockedLibraryDownloadThread.start() + eventually(timeout(10.seconds), interval(100.millis)) { + val threadState = blockedLibraryDownloadThread.getState + assert(Set(Thread.State.BLOCKED, Thread.State.WAITING).contains(threadState)) + } + + // Interrupt the blocked thread: + blockedLibraryDownloadThread.interrupt() + + // The thread should exit: + eventually(timeout(10.seconds), interval(100.millis)) { + assert(blockedLibraryDownloadThread.getState == Thread.State.TERMINATED) + } + + // Allow the first thread to finish and exit: + endLatch.countDown() + eventually(timeout(10.seconds), interval(100.millis)) { + assert(slowLibraryDownloadThread.getState == Thread.State.TERMINATED) + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] From e50e99971db973108313b75dde5cb31c327138e2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 26 Aug 2022 16:15:49 -0700 Subject: [PATCH 2/4] Update Executor.scala --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 53e39c6af2b0..d856bee68ca3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -986,7 +986,7 @@ private[spark] class Executor( lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) updateDependenciesLock.lockInterruptibly() try { - // For testing, so we can simulate a slow library install: + // For testing, so we can simulate a slow file download: testStartLatch.foreach(_.countDown()) // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { @@ -1030,7 +1030,7 @@ private[spark] class Executor( } } } - // For testing, so we can simulate a slow library download: + // For testing, so we can simulate a slow file download: testEndLatch.foreach(_.await()) } finally { updateDependenciesLock.unlock() From 2f12ec5e0686fc07f5a0cb7f310bf9538f76fb32 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 26 Aug 2022 16:17:11 -0700 Subject: [PATCH 3/4] Update ExecutorSuite.scala --- .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 35f2d0a07358..bef36d08e8ae 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -523,7 +523,7 @@ class ExecutorSuite extends SparkFunSuite val endLatch = new CountDownLatch(1) // Start a thread to simulate a task that begins executing updateDependencies() - // and takes a long time to finish because library download or installation is slow: + // and takes a long time to finish because file download is slow: val slowLibraryDownloadThread = new Thread(() => { executor.updateDependencies( Map.empty, @@ -538,7 +538,7 @@ class ExecutorSuite extends SparkFunSuite startLatch.await() // Start a second thread to simulate a task that blocks on the other task's - // library download: + // dependency update: val blockedLibraryDownloadThread = new Thread(() => { executor.updateDependencies( Map.empty, From db01b6d207aa70c9cdc0db5487079089bebd992f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 28 Aug 2022 13:05:13 -0700 Subject: [PATCH 4/4] private[executor] --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d856bee68ca3..138e7da9569d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -977,7 +977,7 @@ private[spark] class Executor( * SparkContext. Also adds any new JARs we fetched to the class loader. * Visible for testing. */ - private[spark] def updateDependencies( + private[executor] def updateDependencies( newFiles: Map[String, Long], newJars: Map[String, Long], newArchives: Map[String, Long],