Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

val KUBERNETES_MAX_PENDING_PODS =
ConfigBuilder("spark.kubernetes.allocation.max.pendingPods")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new namespace, max, with only one child, pendingPods. Do you have a plan to add more? Otherwise, we need to reduce the depth like maxPendingPods.

.doc("Maximum number of pending pods allowed during executor alloction for this application.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alloction -> allocation?

.version("3.2.0")
.intConf
.checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer")
.createWithDefault(150)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default seems high, can you explain why 150?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

My main intention was to come up with a limit which protect us from overloading the k8s scheduler but still allows progressive upscaling. I think when a PODs spends long time in pending state we should make the allocation as early as possible but being careful to avoid the overloading of the k8s scheduler as that would be counterproductive for the allocations. And as we still use the batch size during upscaling there is a limited number of active new POD requests from a single Spark application (this also helps to avoid the overloading).

And the second reason was I hoped this a good default for those envs where the batchsize is already increased (I have seen examples where the batch size was set to 50).

But I just run a few tests and although I have seen 150 pending PODs was not causing any problem during resource allocation my test was running in a EKS cluster where only one Spark app was submitted (my test app) and even the cluster size was small.

So nevertheless we can go for a different solution:
Another strategy to choose this limit would be to use a default which is conformed to the default batch size (which is really small = 5). So what about setting the default to 15 here? In this case we can mention this new config in the migration guide.

@holdenk WDYT?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to propose to disable this feature at Apache Spark 3.2.0 to remove the side-effect completely. For example, we can use Int.MaxValue as default to disable this feature.
WDYT, @attilapiros and @holdenk ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun

And, it would be great if we keep the existing test cases with the default configuration (disabled), and add new test coverage for this new conf.

So this PR has 3 small features which relates to each others:

  • modify how the batch size limit is taken into account: earlier the next batch is not started when even one POD was stuck as newly created POD: these causes some of the test changes
  • change what is outstanding PODs are: earlier Pending PODs and newly created PODs was counted as outstanding PODs which was stopping the allocation if it was coming from the scheduler.
    This causes the rest of the unit test difference.
  • introduce limit for pending pods

Let me separate them into different PRs (at least for two) this will make the review easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, ping me on your split PRs and I'll be happy to take a look.


val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Time to wait between each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator(

private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)

private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)

private val podCreationTimeout = math.max(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
Expand Down Expand Up @@ -91,7 +93,11 @@ private[spark] class ExecutorPodsAllocator(
private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)

// visible for tests
private[k8s] val numOutstandingPods = new AtomicInteger()
private[k8s] val numNewlyCreatedUnknownPods = new AtomicInteger()

// visible for tests
// number of pending PODs: including the known and unknown ones
private[k8s] val numPendingPods = new AtomicInteger()

private var lastSnapshot = ExecutorPodsSnapshot()

Expand Down Expand Up @@ -122,7 +128,7 @@ private[spark] class ExecutorPodsAllocator(
totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
}
logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId")
if (numOutstandingPods.get() == 0) {
if (numNewlyCreatedUnknownPods.get() < podAllocationSize) {
snapshotsStore.notifySubscribers()
}
}
Expand Down Expand Up @@ -216,10 +222,13 @@ private[spark] class ExecutorPodsAllocator(
execPods(execId) = execPodState
}
}

var totalPendingCount = 0
var sumPendingPods = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the old variable totalPendingCount? It looks resemble enough for sumPendingPods in the next context.

// The order we request executors for each ResourceProfile is not guaranteed.
totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) =>
val knownPodsPerTargetForRpId = totalExpectedExecutorsPerResourceProfileId
.asScala
.toSeq
.sortBy(_._1)
.map { case (rpId, targetNum) =>
val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)

val currentRunningCount = podsForRpId.values.count {
Expand All @@ -233,9 +242,8 @@ private[spark] class ExecutorPodsAllocator(
}.partition { case (k, _) =>
schedulerKnownExecs.contains(k)
}
// This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutorsForRpId is immutable.
var knownPendingCount = currentPendingExecutorsForRpId.size
sumPendingPods += currentPendingExecutorsForRpId.size
sumPendingPods += schedulerKnownPendingExecsForRpId.size

val newlyCreatedExecutorsForRpId =
newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
Expand All @@ -248,7 +256,7 @@ private[spark] class ExecutorPodsAllocator(
}

if (podsForRpId.nonEmpty) {
logDebug(s"ResourceProfile Id: $rpId " +
logDebug(s"ResourceProfile Id: $rpId, " +
s"pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutorsForRpId.size} unknown pending, " +
s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
Expand Down Expand Up @@ -293,39 +301,51 @@ private[spark] class ExecutorPodsAllocator(
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= newlyCreatedToDelete
knownPendingCount -= knownPendingToDelete.size
sumPendingPods -= knownPendingToDelete.size
}
}
}
(rpId -> (knownPodCount, targetNum))
}

if (newlyCreatedExecutorsForRpId.isEmpty
&& knownPodCount < targetNum) {
requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, k8sKnownPVCNames)
}
totalPendingCount += knownPendingCount

// The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates.
if (log.isDebugEnabled && snapshots.nonEmpty) {
val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size
if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
numPendingPods.set(sumPendingPods)
// after the downscale is triggered for all the resource profiles the size of
// newlyCreatedExecutors can be used for calculating the remaining batch size for upscaling
knownPodsPerTargetForRpId.foreach { case (rpId, (knownPodCount, targetNum)) =>
val allMissingExecutors = targetNum - knownPodCount
val remainingBatchAllocSize = podAllocationSize - newlyCreatedExecutors.size
val remainingPendingPods = maxPendingPods -sumPendingPods
if (allMissingExecutors <= 0) {
if (allMissingExecutors == 0 && !dynamicAllocationEnabled && snapshots.nonEmpty) {
logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
"equal to the number of requested executors. Not scaling up further.")
} else {
if (outstanding > 0) {
logDebug(s"Still waiting for $outstanding executors for ResourceProfile " +
s"Id $rpId before requesting more.")
}
}
} else if (remainingBatchAllocSize <= 0) {
if (snapshots.nonEmpty) {
logDebug("Batch size limit is reached for ResourceProfile " +
s"Id $rpId before requesting more.")
}
} else if (remainingPendingPods <= 0) {
if (snapshots.nonEmpty) {
logDebug("Max number of pending pod limit is reached for ResourceProfile " +
s"Id $rpId waiting for pods to become running.")
}
} else {
val numExecutorsToAllocate =
math.min(math.min(allMissingExecutors, remainingBatchAllocSize), remainingPendingPods)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $rpId, target: $targetNum, running: $knownPodCount, " +
s"remainingBatchAllocSize: $remainingBatchAllocSize," +
s"remainingPendingPods: $remainingPendingPods.")
requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
}
}
deletedExecutorIds = _deletedExecutorIds

// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed. PODs known by the scheduler backend are not counted here as
// they considered running PODs and they should not block upscaling.
numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
numNewlyCreatedUnknownPods.set(newlyCreatedExecutors.size)
}

private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = {
Expand All @@ -347,14 +367,10 @@ private[spark] class ExecutorPodsAllocator(
}

private def requestNewExecutors(
expected: Int,
running: Int,
numExecutorsToAllocate: Int,
applicationId: String,
resourceProfileId: Int,
pvcsInUse: Seq[String]): Unit = {
val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) {
Expand Down
Loading