From 1cbeb7be37df92345990bc4e0156d5226de9e639 Mon Sep 17 00:00:00 2001 From: JoonPark1 Date: Tue, 21 Oct 2025 11:42:15 -0500 Subject: [PATCH 1/5] fix KubernetesApplicationOperation so that metadata records update accordingly once engine submit timeout is reached - prevent subsequent kyuubi OOM --- .../spark/SparkOnKubernetesTestsSuite.scala | 49 +++++++++++++++++++ .../KubernetesApplicationOperation.scala | 22 ++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 562ee63799a..15e1ffd4510 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -254,4 +254,53 @@ class KyuubiOperationKubernetesClusterClusterModeSuite sessionHandle.identifier.toString) assert(!failKillResponse._1) } + test("If spark batch reach timeout, it should have associated Kyuubi Application Operation be in TIMEOUT state with Spark Driver Engine be in NOT_FOUND state!") { + import scala.collection.JavaConverters._ + // Configure a very small submit timeout to trigger the timeout => 1000ms! + val originalTimeout = conf.get(ENGINE_KUBERNETES_SUBMIT_TIMEOUT) + conf.set(ENGINE_KUBERNETES_SUBMIT_TIMEOUT, 1000L) + + try { + // Prepare a metadata row only (INITIALIZED), without actually launching a Spark driver + val batchId = UUID.randomUUID().toString + val batchRequest = newSparkBatchRequest(conf.getAll ++ Map( + KYUUBI_BATCH_ID_KEY -> batchId)) + + val user = "test-user" + val ipAddress = "test-ip" + + // Insert the metadata so that subsequent update can find this record + sessionManager.initializeBatchState( + user, + ipAddress, + batchRequest.getConf.asScala.toMap, + batchRequest) + + // Create a fresh KubernetesApplicationOperation that can trigger update to metadata upon timeout! + val operation = new KubernetesApplicationOperation + operation.initialize(conf, sessionManager.metadataManager) + + // Use a submitTime far enough in the past to exceed the timeout + val submitTime = Some(System.currentTimeMillis() - 10000L) + + // No driver pod exists for this random batch id, so this should hit the timeout path + operation.getApplicationInfoByTag( + appMgrInfo, + batchId, + Some(user), + submitTime) + + eventually(timeout(30.seconds), interval(200.milliseconds)) { + val mdOpt = sessionManager.getBatchMetadata(batchId) + assert(mdOpt.isDefined) + val md = mdOpt.get + // Verify metadata reflects TIMEOUT and NOT_FOUND as set by the timeout handling + assert(md.state == org.apache.kyuubi.operation.OperationState.TIMEOUT.toString) + assert(md.engineState == NOT_FOUND.toString) + } + } finally { + // restore back original engine submit time out for kyuubi batch job submission! + conf.set(ENGINE_KUBERNETES_SUBMIT_TIMEOUT, originalTimeout) + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 703d223d5bc..251d088f782 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -322,7 +322,27 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (elapsedTime > submitTimeout) { error(s"Can't find target driver pod by ${toLabel(tag)}, " + s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.") - ApplicationInfo.NOT_FOUND + val errorMsg = + s"Driver pod not found for job with kyuubi-unique-tag: $tag after $elapsedTime ms " + + s"(submit-timeout: $submitTimeout ms)" + /* update the metadata store => have current op + that handles given spark app as timed out + & associated spark-sql driver engine as + not found... test if this will prevent future polling */ + try { + metadataManager.foreach(_.updateMetadata( + org.apache.kyuubi.server.metadata.api.Metadata( + identifier = tag, + state = org.apache.kyuubi.operation.OperationState.TIMEOUT.toString, + engineState = ApplicationState.NOT_FOUND.toString, + engineError = Some(errorMsg), + endTime = System.currentTimeMillis()))) + } catch { + case NonFatal(e) => + warn(s"Failed to update metadata for spark job with kyuubi-unique-tag label:" + + s"$tag after submit timeout reached: ${e.getMessage}") + } + appInfo } else { warn(s"Waiting for driver pod with ${toLabel(tag)} to be created, " + s"elapsed time: ${elapsedTime}ms, return UNKNOWN status") From de98cf8a8fc79477c12998338e803a35196e6f7b Mon Sep 17 00:00:00 2001 From: JoonPark1 Date: Tue, 21 Oct 2025 14:43:07 -0500 Subject: [PATCH 2/5] fix styling for test suite for SparkOnKubernetes --- .../test/spark/SparkOnKubernetesTestsSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 15e1ffd4510..5518246f62c 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -254,7 +254,9 @@ class KyuubiOperationKubernetesClusterClusterModeSuite sessionHandle.identifier.toString) assert(!failKillResponse._1) } - test("If spark batch reach timeout, it should have associated Kyuubi Application Operation be in TIMEOUT state with Spark Driver Engine be in NOT_FOUND state!") { + test( + "If spark batch reach timeout, it should have associated Kyuubi Application Operation be " + + "in TIMEOUT state with Spark Driver Engine be in NOT_FOUND state!") { import scala.collection.JavaConverters._ // Configure a very small submit timeout to trigger the timeout => 1000ms! val originalTimeout = conf.get(ENGINE_KUBERNETES_SUBMIT_TIMEOUT) @@ -276,7 +278,8 @@ class KyuubiOperationKubernetesClusterClusterModeSuite batchRequest.getConf.asScala.toMap, batchRequest) - // Create a fresh KubernetesApplicationOperation that can trigger update to metadata upon timeout! + // Create a fresh KubernetesApplicationOperation that can trigger update + // to metadata upon timeout! val operation = new KubernetesApplicationOperation operation.initialize(conf, sessionManager.metadataManager) From 783954da88675721d64f9c51bdf3ac96afd55d61 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Fri, 24 Oct 2025 13:02:45 -0700 Subject: [PATCH 3/5] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../kyuubi/engine/KubernetesApplicationOperation.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 251d088f782..29f5db9b8b7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -325,10 +325,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val errorMsg = s"Driver pod not found for job with kyuubi-unique-tag: $tag after $elapsedTime ms " + s"(submit-timeout: $submitTimeout ms)" - /* update the metadata store => have current op - that handles given spark app as timed out - & associated spark-sql driver engine as - not found... test if this will prevent future polling */ + /* Update the metadata store to mark this operation as timed out and the Spark driver engine as not found. + * This prevents the restarted Kyuubi server from repeatedly polling for this batch job's status. + */ try { metadataManager.foreach(_.updateMetadata( org.apache.kyuubi.server.metadata.api.Metadata( From b952a1584042e9637eb90d9153cb5765bfba7a5a Mon Sep 17 00:00:00 2001 From: JoonPark1 Date: Mon, 27 Oct 2025 10:17:02 -0500 Subject: [PATCH 4/5] Refactor comment in KubernetesApplicationOperation Reformat comment for better readability and fix scalastyle char-limit violations. --- .../kyuubi/engine/KubernetesApplicationOperation.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 29f5db9b8b7..b5a40d6b53d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -325,8 +325,12 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { val errorMsg = s"Driver pod not found for job with kyuubi-unique-tag: $tag after $elapsedTime ms " + s"(submit-timeout: $submitTimeout ms)" - /* Update the metadata store to mark this operation as timed out and the Spark driver engine as not found. - * This prevents the restarted Kyuubi server from repeatedly polling for this batch job's status. + /* Update the metadata store to mark this + operation as timed out and the Spark driver + engine as not found. + This prevents the restarted Kyuubi server + from repeatedly polling for this + batch job's status. */ try { metadataManager.foreach(_.updateMetadata( From eb3430173677bdb3261d7445098e72eb8a784789 Mon Sep 17 00:00:00 2001 From: sungpark Date: Mon, 10 Nov 2025 20:52:44 -0600 Subject: [PATCH 5/5] add new kyuubi server-side config to handle recovery batch jobs based on metadata store upon kyuubi server recovery - to prevent it from being overwhelmed and face OOM issue --- .../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++ .../server/KyuubiRestFrontendService.scala | 63 ++++++++++++------- .../kyuubi/session/KyuubiSessionManager.scala | 6 +- 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 65864986552..4f2efc7d9b2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1910,6 +1910,14 @@ object KyuubiConf { " Kyuubi instances.") .timeConf .createWithDefault(Duration.ofSeconds(20).toMillis) + + val BATCH_SESSIONS_RECOVERY_SIZE: ConfigEntry[Int] = + buildConf("kyuubi.batch.sessions.recovery.size") + .serverOnly + .internal + .doc("The size per batch of kyuubi batch metadata records to fetch and create associated kyuubi sessions at a time for recovery upon restart of kyuubi server") + .intConf + .createWithDefault(10) val BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT: ConfigEntry[Long] = buildConf("kyuubi.batch.internal.rest.client.connect.timeout") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala index 787ac0b0473..f153b3fc115 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala @@ -181,36 +181,51 @@ class KyuubiRestFrontendService(override val serverable: Serverable) @VisibleForTesting private[kyuubi] def recoverBatchSessions(): Unit = withBatchRecoveryLockRequired { val recoveryNumThreads = conf.get(METADATA_RECOVERY_THREADS) + val recoveryBatchSize: Int = conf.get(BATCH_SESSIONS_RECOVERY_SIZE) val batchRecoveryExecutor = ThreadUtils.newDaemonFixedThreadPool(recoveryNumThreads, "batch-recovery-executor") try { - val batchSessionsToRecover = sessionManager.getBatchSessionsToRecover(connectionUrl) - val pendingRecoveryTasksCount = new AtomicInteger(0) - val tasks = batchSessionsToRecover.flatMap { batchSession => - val batchId = batchSession.batchJobSubmissionOp.batchId - try { - val task: Future[Unit] = batchRecoveryExecutor.submit(() => - Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession))) - Some(task -> batchId) - } catch { - case e: Throwable => - error(s"Error while submitting batch[$batchId] for recovery", e) - None - } - } + val offset: Int = 0 + val shouldFetchRemainingBatchSessions: Boolean = true + val totalBatchRecovered: Int = 0 + while(shouldFetchRemainingBatchSessions) { + val batchSessionsToRecover = sessionManager.getBatchSessionsToRecover(connectionUrl, offset, recoveryBatchSize) + if(batchSessionsToRecover.length > 0){ + val pendingRecoveryTasksCount = new AtomicInteger(0) + val tasks = batchSessionsToRecover.flatMap { batchSession => + val batchId = batchSession.batchJobSubmissionOp.batchId + try { + val task: Future[Unit] = batchRecoveryExecutor.submit(() => + Utils.tryLogNonFatalError(sessionManager.openBatchSession(batchSession))) + Some(task -> batchId) + } catch { + case e: Throwable => + error(s"Error while submitting batch[$batchId] for recovery", e) + None + } + } - pendingRecoveryTasksCount.addAndGet(tasks.size) + pendingRecoveryTasksCount.addAndGet(tasks.size) - tasks.foreach { case (task, batchId) => - try { - task.get() - } catch { - case e: Throwable => - error(s"Error while recovering batch[$batchId]", e) - } finally { - val pendingTasks = pendingRecoveryTasksCount.decrementAndGet() - info(s"Batch[$batchId] recovery task terminated, current pending tasks $pendingTasks") + tasks.foreach { case (task, batchId) => + try { + task.get() + } catch { + case e: Throwable => + error(s"Error while recovering batch[$batchId]", e) + } finally { + val pendingTasks = pendingRecoveryTasksCount.decrementAndGet() + info(s"Batch[$batchId] recovery task terminated, current pending tasks $pendingTasks") + } + } + totalBatchRecovered += batchSessionsToRecover.length + offset += batchSessionsToRecover.length + } + else { + shouldFetchRemainingBatchSessions = false + info(s"No more batches left to recover from metadata store.") } + info(s"Recovered $totalBatchRecovered batches total so far successfully from metadata store.") } } finally { ThreadUtils.shutdown(batchRecoveryExecutor) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 344da0e71e8..07417ea4eeb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -313,13 +313,13 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { startEngineAliveChecker() } - def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSession] = { + def getBatchSessionsToRecover(kyuubiInstance: String, offset: Int, batchSize: Int): Seq[KyuubiBatchSession] = { Seq(OperationState.PENDING, OperationState.RUNNING).flatMap { stateToRecover => metadataManager.map(_.getBatchesRecoveryMetadata( stateToRecover.toString, kyuubiInstance, - 0, - Int.MaxValue).map { metadata => + offset, + batchSize).map { metadata => createBatchSessionFromRecovery(metadata) }).getOrElse(Seq.empty) }