Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,20 @@ private[spark] object Config extends Logging {
.createWithDefault(0)

object ExecutorRollPolicy extends Enumeration {
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION = Value
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, FAILED_TASKS = Value
}

val EXECUTOR_ROLL_POLICY =
ConfigBuilder("spark.kubernetes.executor.rollPolicy")
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME (default), " +
"and TOTAL_DURATION. When executor roll happens, Spark uses this policy to choose " +
"an executor and decomission it. The built-in policies are based on executor summary." +
"TOTAL_DURATION, and FAILED_TASKS. " +
"When executor roll happens, Spark uses this policy to choose " +
"an executor and decommission it. The built-in policies are based on executor summary." +
"ID policy chooses an executor with the smallest executor ID. " +
"ADD_TIME policy chooses an executor with the smallest add-time. " +
"TOTAL_GC_TIME policy chooses an executor with the biggest total task GC time. " +
"TOTAL_DURATION policy chooses an executor with the biggest total task time. ")
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
"FAILED_TASKS policy chooses an executor with the most number of failed tasks.")
.version("3.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
listWithoutDriver.sortBy(_.totalGCTime).reverse
case ExecutorRollPolicy.TOTAL_DURATION =>
listWithoutDriver.sortBy(_.totalDuration).reverse
case ExecutorRollPolicy.FAILED_TASKS =>
listWithoutDriver.sortBy(_.failedTasks).reverse
}
sortedList.headOption.map(_.id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,17 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
false, Set())

// The biggest failedTasks
val execWithBiggestFailedTasks = new ExecutorSummary("5", "host:port", true, 1,
10, 10, 1, 1, 1,
5, 0, 1, 100,
1, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
false, Set())

val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
execWithBiggestTotalGCTime, execWithBiggestTotalDuration)
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks)

test("Empty executor list") {
ExecutorRollPolicy.values.foreach { value =>
Expand Down Expand Up @@ -112,4 +121,8 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
test("Policy: TOTAL_DURATION") {
assertEquals(Some("4"), plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)))
}

test("Policy: FAILED_TASKS") {
assertEquals(Some("5"), plugin.invokePrivate(_choose(list, ExecutorRollPolicy.FAILED_TASKS)))
}
}