Skip to content

Commit de7f557

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6087] User is continually blocked by circuit breaker (#1009)
1 parent dabbefe commit de7f557

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import scala.collection.JavaConverters._
2929
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}
3030
import scala.util.Random
3131

32+
import com.google.common.cache.{Cache, CacheBuilder}
33+
3234
import org.apache.spark._
3335
import org.apache.spark.TaskState.TaskState
3436
import org.apache.spark.executor.ExecutorMetrics
@@ -199,6 +201,9 @@ private[spark] class TaskSchedulerImpl(
199201
getTimeAsSeconds("spark.workload.circuit.breaker.user.duration", "2h")
200202
private val taskSetHealthChecker =
201203
ThreadUtils.newDaemonSingleThreadScheduledExecutor("taskset-health-checker")
204+
// store all block user queries and blockTime map
205+
private val blockUserQueries: Cache[String, java.lang.Long] = CacheBuilder.newBuilder()
206+
.maximumSize(10000).build[String, java.lang.Long]()
202207

203208
private val FURTHER_SCHEDULE_LOCK_WAIT_TIME = conf.
204209
getTimeAsMs("spark.task.furtherScheduledLockWaitTime", "5s")
@@ -1143,14 +1148,16 @@ private[spark] class TaskSchedulerImpl(
11431148
s"exceeded the max value!"
11441149
dagScheduler.cancelStage(taskSet.stageId, Some(reason))
11451150

1146-
if (taskSet.userInfo.isDefined) {
1151+
if (taskSet.userInfo.isDefined && !blockUserQueries.asMap.containsKey(groupId)) {
11471152
val message = Some(s"Heavy query detected from session[${sessionId}], " +
11481153
s"query[$groupId], check more details: " +
11491154
s"http://viewpoint.hermes-prod.svc.25.tess.io/?session=${sessionId}, " +
11501155
s"root cause: ")
11511156
dagScheduler.sc.workloadCircuitBreaker.
11521157
foreach(_.blockUser(sessionType, taskSet.userInfo().get.user,
11531158
WORKLOAD_CIRCUIT_BREAKER_USER_DURATION_S, Some(message + reason)))
1159+
// make sure that same query won't block user multiple times
1160+
blockUserQueries.put(groupId, System.currentTimeMillis())
11541161
}
11551162
}
11561163
}

0 commit comments

Comments
 (0)