diff --git a/Sources/DistributedActors/Cluster/ClusterSettings.swift b/Sources/DistributedActors/Cluster/ClusterSettings.swift index 59b365b3e..2986ef50d 100644 --- a/Sources/DistributedActors/Cluster/ClusterSettings.swift +++ b/Sources/DistributedActors/Cluster/ClusterSettings.swift @@ -63,6 +63,9 @@ public struct ClusterSettings { return UniqueNode(node: self.node, nid: self.nid) } + /// Time after which a connection attempt will fail if no connection could be established + public var connectTimeout: TimeAmount = .milliseconds(500) + /// Backoff to be applied when attempting a new connection and handshake with a remote system. public var handshakeBackoffStrategy: BackoffStrategy = Backoff.exponential(initialInterval: .milliseconds(100)) diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 2b538568d..def8d0877 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -389,7 +389,9 @@ extension ClusterShell { serializationPool: self.serializationPool ) - return context.awaitResult(of: outboundChanElf, timeout: .milliseconds(500)) { result in + // the timeout is being handled by the `connectTimeout` socket option + // in NIO, so it is safe to use an infinite timeout here + return context.awaitResult(of: outboundChanElf, timeout: .effectivelyInfinite) { result in switch result { case .success(let chan): return self.ready(state: state.onHandshakeChannelConnected(initiated: initiated, channel: chan)) @@ -482,8 +484,6 @@ extension ClusterShell { } } - state._handshakes[remoteNode] = .initiated(initiated) - case .wasOfferedHandshake(let state): preconditionFailure("Outbound connection error should never happen on receiving end. State was: [\(state)], error was: \(error)") case .completed(let state): diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 59033d5b3..93d30fe5e 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -178,7 +178,6 @@ extension ClusterShellState { switch state { case .initiated(let initiated): - assert(initiated.channel != nil, "Channel should always be present after the initial initialization, state was: \(state)") _ = initiated.channel?.close() case .wasOfferedHandshake: fatalError("abortOutgoingHandshake was called in a context where the handshake was not an outgoing one! Was: \(state)") diff --git a/Sources/DistributedActors/Cluster/TransportPipelines.swift b/Sources/DistributedActors/Cluster/TransportPipelines.swift index b24162606..4c80405f6 100644 --- a/Sources/DistributedActors/Cluster/TransportPipelines.swift +++ b/Sources/DistributedActors/Cluster/TransportPipelines.swift @@ -707,6 +707,7 @@ extension ClusterShell { let bootstrap = ClientBootstrap(group: group) .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .channelOption(ChannelOptions.connectTimeout, value: settings.connectTimeout.toNIO) .channelInitializer { channel in var channelHandlers: [(String?, ChannelHandler)] = []