Skip to content

Commit 310981d

Browse files
Bertrand BossyAndrew Or
authored andcommitted
[SPARK-12583][MESOS] Mesos shuffle service: Don't delete shuffle files before application has stopped
## Problem description: Mesos shuffle service is completely unusable since Spark 1.6.0 . The problem seems to occur since the move from akka to netty in the networking layer. Until now, a connection from the driver to each shuffle service was used as a signal for the shuffle service to determine, whether the driver is still running. Since 1.6.0, this connection is closed after spark.shuffle.io.connectionTimeout (or spark.network.timeout if the former is not set) due to it being idle. The shuffle service interprets this as a signal that the driver has stopped, despite the driver still being alive. Thus, shuffle files are deleted before the application has stopped. ### Context and analysis: spark shuffle fails with mesos after 2mins: https://issues.apache.org/jira/browse/SPARK-12583 External shuffle service broken w/ Mesos: https://issues.apache.org/jira/browse/SPARK-13159 This is a follow up on #11207 . ## What changes were proposed in this pull request? This PR adds a heartbeat signal from the Driver (in MesosExternalShuffleClient) to all registered external mesos shuffle service instances. In MesosExternalShuffleBlockHandler, a thread periodically checks whether a driver has timed out and cleans an application's shuffle files if this is the case. ## How was the this patch tested? This patch has been tested on a small mesos test cluster using the spark-shell. Log output from mesos shuffle service: ``` 16/02/19 15:13:45 INFO mesos.MesosExternalShuffleBlockHandler: Received registration request from app 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 (remote address /xxx.xxx.xxx.xxx:52391, heartbeat timeout 120000 ms). 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-c84c0697-a3f9-4f61-9c64-4d3ee227c047], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-bf46497a-de80-47b9-88f9-563123b59e03], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:16:02 INFO mesos.MesosExternalShuffleBlockHandler: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 timed out. Removing shuffle files. 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 removed, cleanupLocalDirs = true 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3}'s 1 local dirs 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7}'s 1 local dirs ``` Note: there are 2 executors running on this slave. Author: Bertrand Bossy <[email protected]> Closes #11272 from bbossy/SPARK-12583-mesos-shuffle-service-heartbeat.
1 parent 07cb323 commit 310981d

File tree

7 files changed

+195
-53
lines changed

7 files changed

+195
-53
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
2225

26+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
27+
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
2328
import org.slf4j.Logger;
2429
import org.slf4j.LoggerFactory;
2530

@@ -41,6 +46,13 @@
4146
public class MesosExternalShuffleClient extends ExternalShuffleClient {
4247
private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
4348

49+
private final ScheduledExecutorService heartbeaterThread =
50+
Executors.newSingleThreadScheduledExecutor(
51+
new ThreadFactoryBuilder()
52+
.setDaemon(true)
53+
.setNameFormat("mesos-external-shuffle-client-heartbeater")
54+
.build());
55+
4456
/**
4557
* Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
4658
* Please refer to docs on {@link ExternalShuffleClient} for more information.
@@ -53,21 +65,59 @@ public MesosExternalShuffleClient(
5365
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
5466
}
5567

56-
public void registerDriverWithShuffleService(String host, int port) throws IOException {
68+
public void registerDriverWithShuffleService(
69+
String host,
70+
int port,
71+
long heartbeatTimeoutMs,
72+
long heartbeatIntervalMs) throws IOException {
73+
5774
checkInit();
58-
ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
75+
ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
5976
TransportClient client = clientFactory.createClient(host, port);
60-
client.sendRpc(registerDriver, new RpcResponseCallback() {
61-
@Override
62-
public void onSuccess(ByteBuffer response) {
63-
logger.info("Successfully registered app " + appId + " with external shuffle service.");
64-
}
65-
66-
@Override
67-
public void onFailure(Throwable e) {
68-
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
77+
client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs));
78+
}
79+
80+
private class RegisterDriverCallback implements RpcResponseCallback {
81+
private final TransportClient client;
82+
private final long heartbeatIntervalMs;
83+
84+
private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) {
85+
this.client = client;
86+
this.heartbeatIntervalMs = heartbeatIntervalMs;
87+
}
88+
89+
@Override
90+
public void onSuccess(ByteBuffer response) {
91+
heartbeaterThread.scheduleAtFixedRate(
92+
new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
93+
logger.info("Successfully registered app " + appId + " with external shuffle service.");
94+
}
95+
96+
@Override
97+
public void onFailure(Throwable e) {
98+
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
6999
"Please manually remove shuffle data after driver exit. Error: " + e);
70-
}
71-
});
100+
}
101+
}
102+
103+
@Override
104+
public void close() {
105+
heartbeaterThread.shutdownNow();
106+
super.close();
107+
}
108+
109+
private class Heartbeater implements Runnable {
110+
111+
private final TransportClient client;
112+
113+
private Heartbeater(TransportClient client) {
114+
this.client = client;
115+
}
116+
117+
@Override
118+
public void run() {
119+
// TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout
120+
client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
121+
}
72122
}
73123
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.spark.network.protocol.Encodable;
2626
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
27+
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
2728

2829
/**
2930
* Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
@@ -40,7 +41,8 @@ public abstract class BlockTransferMessage implements Encodable {
4041

4142
/** Preceding every serialized message is its type, which allows us to deserialize it. */
4243
public static enum Type {
43-
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
44+
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
45+
HEARTBEAT(5);
4446

4547
private final byte id;
4648

@@ -64,6 +66,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
6466
case 2: return RegisterExecutor.decode(buf);
6567
case 3: return StreamHandle.decode(buf);
6668
case 4: return RegisterDriver.decode(buf);
69+
case 5: return ShuffleServiceHeartbeat.decode(buf);
6770
default: throw new IllegalArgumentException("Unknown message type: " + type);
6871
}
6972
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,29 +31,34 @@
3131
*/
3232
public class RegisterDriver extends BlockTransferMessage {
3333
private final String appId;
34+
private final long heartbeatTimeoutMs;
3435

35-
public RegisterDriver(String appId) {
36+
public RegisterDriver(String appId, long heartbeatTimeoutMs) {
3637
this.appId = appId;
38+
this.heartbeatTimeoutMs = heartbeatTimeoutMs;
3739
}
3840

3941
public String getAppId() { return appId; }
4042

43+
public long getHeartbeatTimeoutMs() { return heartbeatTimeoutMs; }
44+
4145
@Override
4246
protected Type type() { return Type.REGISTER_DRIVER; }
4347

4448
@Override
4549
public int encodedLength() {
46-
return Encoders.Strings.encodedLength(appId);
50+
return Encoders.Strings.encodedLength(appId) + Long.SIZE / Byte.SIZE;
4751
}
4852

4953
@Override
5054
public void encode(ByteBuf buf) {
5155
Encoders.Strings.encode(buf, appId);
56+
buf.writeLong(heartbeatTimeoutMs);
5257
}
5358

5459
@Override
5560
public int hashCode() {
56-
return Objects.hashCode(appId);
61+
return Objects.hashCode(appId, heartbeatTimeoutMs);
5762
}
5863

5964
@Override
@@ -66,6 +71,7 @@ public boolean equals(Object o) {
6671

6772
public static RegisterDriver decode(ByteBuf buf) {
6873
String appId = Encoders.Strings.decode(buf);
69-
return new RegisterDriver(appId);
74+
long heartbeatTimeout = buf.readLong();
75+
return new RegisterDriver(appId, heartbeatTimeout);
7076
}
7177
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffle.protocol.mesos;
19+
20+
import io.netty.buffer.ByteBuf;
21+
import org.apache.spark.network.protocol.Encoders;
22+
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
23+
24+
// Needed by ScalaDoc. See SPARK-7726
25+
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
26+
27+
/**
28+
* A heartbeat sent from the driver to the MesosExternalShuffleService.
29+
*/
30+
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
31+
private final String appId;
32+
33+
public ShuffleServiceHeartbeat(String appId) {
34+
this.appId = appId;
35+
}
36+
37+
public String getAppId() { return appId; }
38+
39+
@Override
40+
protected Type type() { return Type.HEARTBEAT; }
41+
42+
@Override
43+
public int encodedLength() { return Encoders.Strings.encodedLength(appId); }
44+
45+
@Override
46+
public void encode(ByteBuf buf) {
47+
Encoders.Strings.encode(buf, appId);
48+
}
49+
50+
public static ShuffleServiceHeartbeat decode(ByteBuf buf) {
51+
return new ShuffleServiceHeartbeat(Encoders.Strings.decode(buf));
52+
}
53+
}

core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,69 +17,89 @@
1717

1818
package org.apache.spark.deploy.mesos
1919

20-
import java.net.SocketAddress
2120
import java.nio.ByteBuffer
21+
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2222

23-
import scala.collection.mutable
23+
import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2626
import org.apache.spark.deploy.ExternalShuffleService
2727
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
2828
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
2929
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
30-
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
30+
import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
3131
import org.apache.spark.network.util.TransportConf
32+
import org.apache.spark.util.ThreadUtils
3233

3334
/**
3435
* An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
3536
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
3637
*/
37-
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
38+
private[mesos] class MesosExternalShuffleBlockHandler(
39+
transportConf: TransportConf,
40+
cleanerIntervalS: Long)
3841
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
3942

40-
// Stores a map of driver socket addresses to app ids
41-
private val connectedApps = new mutable.HashMap[SocketAddress, String]
43+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
44+
.scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)
45+
46+
// Stores a map of app id to app state (timeout value and last heartbeat)
47+
private val connectedApps = new ConcurrentHashMap[String, AppState]()
4248

4349
protected override def handleMessage(
4450
message: BlockTransferMessage,
4551
client: TransportClient,
4652
callback: RpcResponseCallback): Unit = {
4753
message match {
48-
case RegisterDriverParam(appId) =>
54+
case RegisterDriverParam(appId, appState) =>
4955
val address = client.getSocketAddress
50-
logDebug(s"Received registration request from app $appId (remote address $address).")
51-
if (connectedApps.contains(address)) {
52-
val existingAppId = connectedApps(address)
53-
if (!existingAppId.equals(appId)) {
54-
logError(s"A new app '$appId' has connected to existing address $address, " +
55-
s"removing previously registered app '$existingAppId'.")
56-
applicationRemoved(existingAppId, true)
57-
}
56+
val timeout = appState.heartbeatTimeout
57+
logInfo(s"Received registration request from app $appId (remote address $address, " +
58+
s"heartbeat timeout $timeout ms).")
59+
if (connectedApps.containsKey(appId)) {
60+
logWarning(s"Received a registration request from app $appId, but it was already " +
61+
s"registered")
5862
}
59-
connectedApps(address) = appId
63+
connectedApps.put(appId, appState)
6064
callback.onSuccess(ByteBuffer.allocate(0))
65+
case Heartbeat(appId) =>
66+
val address = client.getSocketAddress
67+
Option(connectedApps.get(appId)) match {
68+
case Some(existingAppState) =>
69+
logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
70+
s"address $address).")
71+
existingAppState.lastHeartbeat = System.nanoTime()
72+
case None =>
73+
logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
74+
s"address $address, appId '$appId').")
75+
}
6176
case _ => super.handleMessage(message, client, callback)
6277
}
6378
}
6479

65-
/**
66-
* On connection termination, clean up shuffle files written by the associated application.
67-
*/
68-
override def channelInactive(client: TransportClient): Unit = {
69-
val address = client.getSocketAddress
70-
if (connectedApps.contains(address)) {
71-
val appId = connectedApps(address)
72-
logInfo(s"Application $appId disconnected (address was $address).")
73-
applicationRemoved(appId, true /* cleanupLocalDirs */)
74-
connectedApps.remove(address)
75-
} else {
76-
logWarning(s"Unknown $address disconnected.")
77-
}
78-
}
79-
8080
/** An extractor object for matching [[RegisterDriver]] message. */
8181
private object RegisterDriverParam {
82-
def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
82+
def unapply(r: RegisterDriver): Option[(String, AppState)] =
83+
Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
84+
}
85+
86+
private object Heartbeat {
87+
def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
88+
}
89+
90+
private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)
91+
92+
private class CleanerThread extends Runnable {
93+
override def run(): Unit = {
94+
val now = System.nanoTime()
95+
connectedApps.asScala.foreach { case (appId, appState) =>
96+
if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
97+
logInfo(s"Application $appId timed out. Removing shuffle files.")
98+
connectedApps.remove(appId)
99+
applicationRemoved(appId, true)
100+
}
101+
}
102+
}
83103
}
84104
}
85105

@@ -93,7 +113,8 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage
93113

94114
protected override def newShuffleBlockHandler(
95115
conf: TransportConf): ExternalShuffleBlockHandler = {
96-
new MesosExternalShuffleBlockHandler(conf)
116+
val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s")
117+
new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
97118
}
98119
}
99120

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,12 @@ private[spark] class CoarseMesosSchedulerBackend(
448448
s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
449449

450450
mesosExternalShuffleClient.get
451-
.registerDriverWithShuffleService(slave.hostname, externalShufflePort)
451+
.registerDriverWithShuffleService(
452+
slave.hostname,
453+
externalShufflePort,
454+
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
455+
s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
456+
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
452457
slave.shuffleRegistered = true
453458
}
454459

@@ -506,6 +511,9 @@ private[spark] class CoarseMesosSchedulerBackend(
506511
+ "on the mesos nodes.")
507512
}
508513

514+
// Close the mesos external shuffle client if used
515+
mesosExternalShuffleClient.foreach(_.close())
516+
509517
if (mesosDriver != null) {
510518
mesosDriver.stop()
511519
}

core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
192192

193193
val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
194194
backend.statusUpdate(driver, status2)
195-
verify(externalShuffleClient, times(1)).registerDriverWithShuffleService(anyString, anyInt)
195+
verify(externalShuffleClient, times(1))
196+
.registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
196197
}
197198

198199
test("mesos kills an executor when told") {

0 commit comments

Comments
 (0)