Skip to content

Commit caca15a

Browse files
SaintBacchusAndrew Or
authored andcommitted
[SPARK-5444][Network]Add a retry to deal with the conflict port in netty server.
If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit. So add a retry to avoid this situation. Author: huangzhaowei <[email protected]> Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits: cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server. (cherry picked from commit 2bda1c1) Signed-off-by: Andrew Or <[email protected]>
1 parent 9fa29a6 commit caca15a

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

network/common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
100100
}
101101
});
102102

103-
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
104-
channelFuture.syncUninterruptibly();
103+
bindRightPort(portToBind);
105104

106105
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
107106
logger.debug("Shuffle server started on port :" + port);
@@ -123,4 +122,37 @@ public void close() {
123122
bootstrap = null;
124123
}
125124

125+
/**
126+
* Attempt to bind to the specified port up to a fixed number of retries.
127+
* If all attempts fail after the max number of retries, exit.
128+
*/
129+
private void bindRightPort(int portToBind) {
130+
int maxPortRetries = conf.portMaxRetries();
131+
132+
for (int i = 0; i <= maxPortRetries; i++) {
133+
int tryPort = -1;
134+
if (0 == portToBind) {
135+
// Do not increment port if tryPort is 0, which is treated as a special port
136+
tryPort = 0;
137+
} else {
138+
// If the new port wraps around, do not try a privilege port
139+
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
140+
}
141+
try {
142+
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
143+
channelFuture.syncUninterruptibly();
144+
return;
145+
} catch (Exception e) {
146+
logger.warn("Netty service could not bind on port " + tryPort +
147+
". Attempting the next port.");
148+
if (i >= maxPortRetries) {
149+
logger.error(e.getMessage() + ": Netty server failed after "
150+
+ maxPortRetries + " retries.");
151+
152+
// If it can't find a right port, it should exit directly.
153+
System.exit(-1);
154+
}
155+
}
156+
}
157+
}
126158
}

network/common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,11 @@ public int memoryMapBytes() {
9898
public boolean lazyFileDescriptor() {
9999
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
100100
}
101+
102+
/**
103+
* Maximum number of retries when binding to a port before giving up.
104+
*/
105+
public int portMaxRetries() {
106+
return conf.getInt("spark.port.maxRetries", 16);
107+
}
101108
}

0 commit comments

Comments
 (0)