Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
bf3c060
[SPARK-14915][CORE] Don't re-queue a task if another attempt has alre…
jasonmoore2k May 5, 2016
a3aa22a
[SPARK-14915] Fix incorrect resolution of merge conflict in commit bf…
srowen May 6, 2016
ab00652
[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor …
cenyuhai May 6, 2016
518af07
[SPARK-15223][DOCS] fix wrongly named config reference
philipphoffmann May 9, 2016
1678bff
[SPARK-15209] Fix display of job descriptions with single quotes in w…
JoshRosen May 9, 2016
d165486
[SPARK-14495][SQL][1.6] fix resolution failure of having clause with …
xwu0226 May 11, 2016
c433c0a
[SPARK-13519][CORE] Driver should tell Executor to stop itself when c…
zsxwing Feb 26, 2016
86bf93e
[SPARK-13522][CORE] Executor should kill itself when it's unable to h…
zsxwing Feb 29, 2016
ced71d3
[SPARK-13522][CORE] Fix the exit log place for heartbeat
zsxwing Feb 29, 2016
e2a43d0
[SPARK-15262] Synchronize block manager / scheduler executor state
May 11, 2016
fd2da7b
[SPARK-15260] Atomically resize memory pools (branch 1.6)
May 12, 2016
7200e6b
[SPARK-14261][SQL] Memory leak in Spark Thrift Server
dosoft May 20, 2016
7ad82b6
[SPARK-15395][CORE] Use getHostString to create RpcAddress (backport …
zsxwing May 20, 2016
9a18115
[SPARK-15165] [SPARK-15205] [SQL] Introduce place holder for comments…
sarutak May 20, 2016
0864213
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra May 25, 2016
5cc1e2c
[SPARK-10722] RDDBlockId not found in driver-heartbeater
simonjscott May 26, 2016
0b8bdf7
[SPARK-8428][SPARK-13850] Fix integer overflows in TimSort
sameeragarwal May 26, 2016
c53c83c
[BUILD][1.6] Fix compilation
sameeragarwal May 27, 2016
ea84b33
[SPARK-15528][SQL] Fix race condition in NumberConverter
maropu May 31, 2016
714f4d7
[SPARK-15601][CORE] CircularBuffer's toString() to print only the con…
tejasapatil Jun 1, 2016
0a13e4c
[SPARK-14204][SQL] register driverClass rather than user-specified class
mchalek Jun 2, 2016
c9013de
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jun 2, 2016
9da35b2
jackson 2.7.3
markhamstra Jun 2, 2016
4259a28
[SPARK-15736][CORE][BRANCH-1.6] Gracefully handle loss of DiskStore f…
JoshRosen Jun 3, 2016
a0cf7d0
[SPARK-15754][YARN] Not letting the credentials containing hdfs deleg…
Jun 3, 2016
6a9f19d
[SPARK-15723] Fixed local-timezone-brittle test where short-timezone …
javabrett Jun 5, 2016
5830828
[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs
jasoncl Jan 15, 2016
bb917fc
[SPARK-12712] Fix failure in ./dev/test-dependencies when run against…
JoshRosen Jun 9, 2016
739d992
[SPARK-15827][BUILD] Publish Spark's forked sbt-pom-reader to Maven C…
JoshRosen Jun 9, 2016
393f4ba
[DOCUMENTATION] fixed groupby aggregation example for pyspark
mortada Jun 10, 2016
1b34fbf
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jun 10, 2016
be3c41b
[SPARK-15892][ML] Incorrectly merged AFTAggregator with zero total count
HyukjinKwon Jun 12, 2016
2f3e327
Revert "[SPARK-15892][ML] Incorrectly merged AFTAggregator with zero …
jkbradley Jun 14, 2016
cffc080
[SPARK-15915][SQL] Logical plans should use subqueries eliminated pla…
ueshin Jun 15, 2016
0a8ada5
[SPARK-15975] Fix improper Popen retcode code handling in dev/run-tests
JoshRosen Jun 16, 2016
a4485c3
Update branch-1.6 for 1.6.2 release.
rxin Jun 16, 2016
f166493
Preparing Spark release v1.6.2
pwendell Jun 16, 2016
b8f380f
Preparing development version 1.6.3-SNAPSHOT
pwendell Jun 16, 2016
4168d9c
Preparing Spark release v1.6.2-rc1
pwendell Jun 16, 2016
4621fe9
Preparing development version 1.6.3-SNAPSHOT
pwendell Jun 16, 2016
e530823
Revert "[SPARK-15395][CORE] Use getHostString to create RpcAddress (b…
zsxwing Jun 17, 2016
fd05389
[SPARK-15892][ML] Backport correctly merging AFTAggregators to branch…
HyukjinKwon Jun 18, 2016
3f1d730
[SPARK-16035][PYSPARK] Fix SparseVector parser assertion for end pare…
andreapasqua Jun 18, 2016
41efd20
[SPARK-15613] [SQL] Fix incorrect days to millis conversion due to Da…
Jun 19, 2016
3d569d9
Revert "[SPARK-15613] [SQL] Fix incorrect days to millis conversion d…
davies Jun 19, 2016
54b1121
Preparing Spark release v1.6.2-rc2
pwendell Jun 19, 2016
2083485
Preparing development version 1.6.3-SNAPSHOT
pwendell Jun 19, 2016
16b7f1d
[SPARK-14391][LAUNCHER] Fix launcher communication test, take 2.
Apr 30, 2016
db86e7f
[SPARK-15613] [SQL] Fix incorrect days to millis conversion due to Da…
Jun 19, 2016
abe36c5
[SPARK-16086] [SQL] fix Python UDF without arguments (for 1.6)
davies Jun 21, 2016
d98fb19
[SPARK-15606][CORE] Use non-blocking removeExecutor call to avoid dea…
robbinspg Jun 2, 2016
4fdac3c
[SPARK-6005][TESTS] Fix flaky test: o.a.s.streaming.kafka.DirectKafka…
zsxwing May 10, 2016
d7223bb
[SPARK-16077] [PYSPARK] catch the exception from pickle.whichmodule()
Jun 24, 2016
21749cb
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jun 24, 2016
183ced4
[SPARK-13780][SQL] Add missing dependency to build.
Mar 11, 2016
496dde3
reset to _2.10
markhamstra Jun 24, 2016
b7acc1b
[SPARK-16173] [SQL] Can't join describe() of DataFrame in Scala 2.10
dongjoon-hyun Jun 25, 2016
24d59fb
[MLLIB] org.apache.spark.mllib.util.SVMDataGenerator generates ArrayI…
j4munoz Jun 25, 2016
60e095b
[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spillin…
srowen Jun 25, 2016
22a496d
[SPARK-16214][EXAMPLES] fix the denominator of SparkPi
yanghaogn Jun 27, 2016
4a67541
[SPARK-13023][PROJECT INFRA][FOLLOWUP][BRANCH-1.6] Unable to check `r…
HyukjinKwon Jun 28, 2016
beecb79
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Jun 28, 2016
65aa44f
version to 1.6.2
markhamstra Jun 29, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.1-csd-10-SNAPSHOT</version>
<artifactId>spark-parent_2.10</artifactId>
<version>1.6.2-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-assembly_2.11</artifactId>
<artifactId>spark-assembly_2.10</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.apache.org/</url>
<packaging>pom</packaging>
Expand Down
6 changes: 3 additions & 3 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.1-csd-10-SNAPSHOT</version>
<artifactId>spark-parent_2.10</artifactId>
<version>1.6.2-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.11</artifactId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
</properties>
Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>1.6.1-csd-10-SNAPSHOT</version>
<artifactId>spark-parent_2.10</artifactId>
<version>1.6.2-csd-1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 8,
src.getBaseOffset() + srcPos * 8L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 8,
length * 8
dst.getBaseOffset() + dstPos * 8L,
length * 8L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.spark.memory.TaskMemoryManager;

final class RecordPointerAndKeyPrefix {
public final class RecordPointerAndKeyPrefix {
/**
* A pointer to a record; see {@link TaskMemoryManager} for a
* description of how these addresses are encoded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
* Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
*/
final class UnsafeSortDataFormat extends SortDataFormat<RecordPointerAndKeyPrefix, LongArray> {
public final class UnsafeSortDataFormat
extends SortDataFormat<RecordPointerAndKeyPrefix, LongArray> {

public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat();

Expand Down Expand Up @@ -73,10 +74,10 @@ public void copyElement(LongArray src, int srcPos, LongArray dst, int dstPos) {
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
src.getBaseOffset() + srcPos * 16,
src.getBaseOffset() + srcPos * 16L,
dst.getBaseObject(),
dst.getBaseOffset() + dstPos * 16,
length * 16);
dst.getBaseOffset() + dstPos * 16L,
length * 16L);
}

@Override
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -190,8 +191,14 @@ private[spark] object TestUtils {
private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int]
private val stagesDone = new CountDownLatch(1)

def numSpilledStages: Int = spilledStageIds.size
def numSpilledStages: Int = {
// Long timeout, just in case somehow the job end isn't notified.
// Fails if a timeout occurs
assert(stagesDone.await(10, TimeUnit.SECONDS))
spilledStageIds.size
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate(
Expand All @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener {
spilledStageIds += stageId
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
stagesDone.countDown()
}
}
39 changes: 37 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ private[spark] class Executor(
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. It means we will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)

/**
* Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
* successful heartbeat will reset it to 0.
*/
private var heartbeatFailures = 0

startDriverHeartbeater()

def launchTask(
Expand Down Expand Up @@ -218,6 +231,7 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand All @@ -227,6 +241,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down Expand Up @@ -434,7 +459,9 @@ private[spark] class Executor(
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
// the changes of metrics any more, so make a deep copy of it
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
val copiedMetrics = Utils.deserialize[TaskMetrics](
Utils.serialize(metrics),
Utils.getContextOrSparkClassLoader)
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
} else {
// It will be copied by serialization
Expand All @@ -452,8 +479,16 @@ private[spark] class Executor(
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
heartbeatFailures = 0
} catch {
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ object ExecutorExitCode {
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55

/**
* Executor is unable to send heartbeats to the driver more than
* "spark.executor.heartbeat.maxFailures" times.
*/
val HEARTBEAT_FAILURE = 56

def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
Expand All @@ -51,6 +57,8 @@ object ExecutorExitCode {
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
}

/**
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
* of bytes removed from the pool's capacity.
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
*
* @return number of bytes to be removed from the pool's capacity.
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
Expand All @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
storageRegionSize,
maxMemory - storageRegionSize) {

assertInvariant()

// We always maintain this invariant:
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}

override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
Expand All @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
Expand All @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.6.1"
val SPARK_VERSION = "1.6.2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,16 @@ private[spark] class TaskSetManager(
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)

if (successful(index)) {
logInfo(
s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
"but another instance of the task has already succeeded, " +
"so not re-queuing the task to be re-executed.")
} else {
addPendingTask(index)
}

if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

Expand Down Expand Up @@ -263,7 +267,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
case None =>
// SPARK-15262: If an executor is still alive even after the scheduler has removed
// its metadata, we may receive a heartbeat from that executor and tell its block
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}

Expand Down
Loading