@@ -19,7 +19,7 @@ package org.apache.spark.sql.connect.service
1919
2020import java .util .UUID
2121import java .util .concurrent .{ConcurrentHashMap , ConcurrentMap , Executors , ScheduledExecutorService , TimeUnit }
22- import javax . annotation .concurrent .GuardedBy
22+ import java . util .concurrent .atomic .{ AtomicLong , AtomicReference }
2323
2424import scala .collection .mutable
2525import scala .concurrent .duration .FiniteDuration
@@ -66,21 +66,19 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
6666 /** Concurrent hash table containing all the current executions. */
6767 private val executions : ConcurrentMap [ExecuteKey , ExecuteHolder ] =
6868 new ConcurrentHashMap [ExecuteKey , ExecuteHolder ]()
69- private val executionsLock = new Object
7069
7170 /** Graveyard of tombstones of executions that were abandoned and removed. */
7271 private val abandonedTombstones = CacheBuilder
7372 .newBuilder()
7473 .maximumSize(SparkEnv .get.conf.get(CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE ))
7574 .build[ExecuteKey , ExecuteInfo ]()
7675
77- /** None if there are no executions. Otherwise, the time when the last execution was removed. */
78- @ GuardedBy (" executionsLock" )
79- private var lastExecutionTimeMs : Option [Long ] = Some (System .currentTimeMillis())
76+ /** The time when the last execution was removed. */
77+ private var lastExecutionTimeMs : AtomicLong = new AtomicLong (System .currentTimeMillis())
8078
8179 /** Executor for the periodic maintenance */
82- @ GuardedBy ( " executionsLock " )
83- private var scheduledExecutor : Option [ScheduledExecutorService ] = None
80+ private var scheduledExecutor : AtomicReference [ ScheduledExecutorService ] =
81+ new AtomicReference [ScheduledExecutorService ]()
8482
8583 /**
8684 * Create a new ExecuteHolder and register it with this global manager and with its session.
@@ -118,11 +116,6 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
118116
119117 sessionHolder.addExecuteHolder(executeHolder)
120118
121- executionsLock.synchronized {
122- if (! executions.isEmpty()) {
123- lastExecutionTimeMs = None
124- }
125- }
126119 logInfo(log " ExecuteHolder ${MDC (LogKeys .EXECUTE_KEY , executeHolder.key)} is created. " )
127120
128121 schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started.
@@ -151,11 +144,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
151144 executions.remove(key)
152145 executeHolder.sessionHolder.removeExecuteHolder(executeHolder.operationId)
153146
154- executionsLock.synchronized {
155- if (executions.isEmpty) {
156- lastExecutionTimeMs = Some (System .currentTimeMillis())
157- }
158- }
147+ updateLastExecutionTime()
159148
160149 logInfo(log " ExecuteHolder ${MDC (LogKeys .EXECUTE_KEY , key)} is removed. " )
161150
@@ -197,7 +186,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
197186 */
198187 def listActiveExecutions : Either [Long , Seq [ExecuteInfo ]] = {
199188 if (executions.isEmpty) {
200- Left (lastExecutionTimeMs.get )
189+ Left (lastExecutionTimeMs.getAcquire() )
201190 } else {
202191 Right (executions.values().asScala.map(_.getExecuteInfo).toBuffer.toSeq)
203192 }
@@ -212,39 +201,40 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
212201 }
213202
214203 private [connect] def shutdown (): Unit = {
215- executionsLock.synchronized {
216- scheduledExecutor.foreach { executor =>
217- ThreadUtils .shutdown(executor, FiniteDuration (1 , TimeUnit .MINUTES ))
218- }
219- scheduledExecutor = None
204+ val executor = scheduledExecutor.getAndSet(null )
205+ if (executor != null ) {
206+ ThreadUtils .shutdown(executor, FiniteDuration (1 , TimeUnit .MINUTES ))
220207 }
221208
222209 // note: this does not cleanly shut down the executions, but the server is shutting down.
223210 executions.clear()
224211 abandonedTombstones.invalidateAll()
225212
226- executionsLock.synchronized {
227- if (lastExecutionTimeMs.isEmpty) {
228- lastExecutionTimeMs = Some (System .currentTimeMillis())
229- }
230- }
213+ updateLastExecutionTime()
214+ }
215+
216+ /**
217+ * Updates the last execution time after the last execution has been removed.
218+ */
219+ private def updateLastExecutionTime (): Unit = {
220+ lastExecutionTimeMs.getAndUpdate(prev => prev.max(System .currentTimeMillis()))
231221 }
232222
233223 /**
234224 * Schedules periodic maintenance checks if it is not already scheduled. The checks are looking
235225 * for executions that have not been closed, but are left with no RPC attached to them, and
236226 * removes them after a timeout.
237227 */
238- private def schedulePeriodicChecks (): Unit = executionsLock.synchronized {
239- scheduledExecutor match {
240- case Some (_) => // Already running.
241- case None =>
228+ private def schedulePeriodicChecks (): Unit = {
229+ var executor = scheduledExecutor.getAcquire()
230+ if (executor == null ) {
231+ executor = Executors .newSingleThreadScheduledExecutor()
232+ if (scheduledExecutor.compareAndExchangeRelease(null , executor) == null ) {
242233 val interval = SparkEnv .get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL )
243234 logInfo(
244235 log " Starting thread for cleanup of abandoned executions every " +
245236 log " ${MDC (LogKeys .INTERVAL , interval)} ms " )
246- scheduledExecutor = Some (Executors .newSingleThreadScheduledExecutor())
247- scheduledExecutor.get.scheduleAtFixedRate(
237+ executor.scheduleAtFixedRate(
248238 () => {
249239 try {
250240 val timeout = SparkEnv .get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT )
@@ -256,6 +246,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
256246 interval,
257247 interval,
258248 TimeUnit .MILLISECONDS )
249+ }
259250 }
260251 }
261252
0 commit comments