Skip to content

Commit e5e126b

Browse files
drexinktoso
authored andcommitted
Add connectTimeout to outgoing connections and fix handling of connec… (#71)
* Add connectTimeout to outgoing connections and fix handling of connection abortion #13 * Fix reconnection issue
1 parent f697132 commit e5e126b

File tree

4 files changed

+7
-4
lines changed

4 files changed

+7
-4
lines changed

Sources/DistributedActors/Cluster/ClusterSettings.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public struct ClusterSettings {
6363
return UniqueNode(node: self.node, nid: self.nid)
6464
}
6565

66+
/// Time after which a connection attempt will fail if no connection could be established
67+
public var connectTimeout: TimeAmount = .milliseconds(500)
68+
6669
/// Backoff to be applied when attempting a new connection and handshake with a remote system.
6770
public var handshakeBackoffStrategy: BackoffStrategy = Backoff.exponential(initialInterval: .milliseconds(100))
6871

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,9 @@ extension ClusterShell {
365365
serializationPool: self.serializationPool
366366
)
367367

368-
return context.awaitResult(of: outboundChanElf, timeout: .milliseconds(500)) { result in
368+
// the timeout is being handled by the `connectTimeout` socket option
369+
// in NIO, so it is safe to use an infinite timeout here
370+
return context.awaitResult(of: outboundChanElf, timeout: .effectivelyInfinite) { result in
369371
switch result {
370372
case .success(let chan):
371373
return self.ready(state: state.onHandshakeChannelConnected(initiated: initiated, channel: chan))
@@ -458,8 +460,6 @@ extension ClusterShell {
458460
}
459461
}
460462

461-
state._handshakes[remoteNode] = .initiated(initiated)
462-
463463
case .wasOfferedHandshake(let state):
464464
preconditionFailure("Outbound connection error should never happen on receiving end. State was: [\(state)], error was: \(error)")
465465
case .completed(let state):

Sources/DistributedActors/Cluster/ClusterShellState.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ extension ClusterShellState {
178178

179179
switch state {
180180
case .initiated(let initiated):
181-
assert(initiated.channel != nil, "Channel should always be present after the initial initialization, state was: \(state)")
182181
_ = initiated.channel?.close()
183182
case .wasOfferedHandshake:
184183
fatalError("abortOutgoingHandshake was called in a context where the handshake was not an outgoing one! Was: \(state)")

Sources/DistributedActors/Cluster/TransportPipelines.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ extension ClusterShell {
707707

708708
let bootstrap = ClientBootstrap(group: group)
709709
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
710+
.channelOption(ChannelOptions.connectTimeout, value: settings.connectTimeout.toNIO)
710711
.channelInitializer { channel in
711712
var channelHandlers: [(String?, ChannelHandler)] = []
712713

0 commit comments

Comments
 (0)