From ebf57b32fafce7b14f0cf0011b8bbac93f20548b Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Thu, 29 Aug 2019 17:06:04 -0700 Subject: [PATCH 1/2] Add connectTimeout to outgoing connections and fix handling of connection abortion #13 --- Sources/DistributedActors/Cluster/ClusterSettings.swift | 3 +++ Sources/DistributedActors/Cluster/ClusterShell.swift | 9 +++++---- .../DistributedActors/Cluster/ClusterShellState.swift | 4 ++-- .../DistributedActors/Cluster/TransportPipelines.swift | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Sources/DistributedActors/Cluster/ClusterSettings.swift b/Sources/DistributedActors/Cluster/ClusterSettings.swift index 59b365b3e..2986ef50d 100644 --- a/Sources/DistributedActors/Cluster/ClusterSettings.swift +++ b/Sources/DistributedActors/Cluster/ClusterSettings.swift @@ -63,6 +63,9 @@ public struct ClusterSettings { return UniqueNode(node: self.node, nid: self.nid) } + /// Time after which a connection attempt will fail if no connection could be established + public var connectTimeout: TimeAmount = .milliseconds(500) + /// Backoff to be applied when attempting a new connection and handshake with a remote system. public var handshakeBackoffStrategy: BackoffStrategy = Backoff.exponential(initialInterval: .milliseconds(100)) diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 2b538568d..21359a193 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -389,7 +389,9 @@ extension ClusterShell { serializationPool: self.serializationPool ) - return context.awaitResult(of: outboundChanElf, timeout: .milliseconds(500)) { result in + // the timeout is being handled by the `connectTimeout` socket option + // in NIO, so it is safe to use an infinite timeout here + return context.awaitResult(of: outboundChanElf, timeout: .effectivelyInfinite) { result in switch result { case .success(let chan): return self.ready(state: state.onHandshakeChannelConnected(initiated: initiated, channel: chan)) @@ -471,19 +473,18 @@ extension ClusterShell { switch initiated.onHandshakeError(error) { case .scheduleRetryHandshake(let delay): state.log.info("Schedule handshake retry to: [\(initiated.remoteNode)] delay: [\(delay)]") + _ = state.abortOutgoingHandshake(with: remoteNode, mayNotHaveChannel: true) context.timers.startSingle( key: TimerKey("handshake-timer-\(remoteNode)"), message: .command(.retryHandshake(initiated)), delay: delay ) case .giveUpOnHandshake: - if let hsmState = state.abortOutgoingHandshake(with: remoteNode) { + if let hsmState = state.abortOutgoingHandshake(with: remoteNode, mayNotHaveChannel: true) { self.notifyHandshakeFailure(state: hsmState, node: remoteNode, error: error) } } - state._handshakes[remoteNode] = .initiated(initiated) - case .wasOfferedHandshake(let state): preconditionFailure("Outbound connection error should never happen on receiving end. State was: [\(state)], error was: \(error)") case .completed(let state): diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 59033d5b3..7a66cfd12 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -171,14 +171,14 @@ extension ClusterShellState { /// /// - Faults: when called in wrong state of an ongoing handshake /// - Returns: if present, the (now removed) handshake state that was aborted, hil otherwise. - mutating func abortOutgoingHandshake(with node: Node) -> HandshakeStateMachine.State? { + mutating func abortOutgoingHandshake(with node: Node, mayNotHaveChannel: Bool = false) -> HandshakeStateMachine.State? { guard let state = self._handshakes.removeValue(forKey: node) else { return nil } switch state { case .initiated(let initiated): - assert(initiated.channel != nil, "Channel should always be present after the initial initialization, state was: \(state)") + assert(mayNotHaveChannel || initiated.channel != nil, "Channel should always be present after the initial initialization, state was: \(state)") _ = initiated.channel?.close() case .wasOfferedHandshake: fatalError("abortOutgoingHandshake was called in a context where the handshake was not an outgoing one! Was: \(state)") diff --git a/Sources/DistributedActors/Cluster/TransportPipelines.swift b/Sources/DistributedActors/Cluster/TransportPipelines.swift index b24162606..4c80405f6 100644 --- a/Sources/DistributedActors/Cluster/TransportPipelines.swift +++ b/Sources/DistributedActors/Cluster/TransportPipelines.swift @@ -707,6 +707,7 @@ extension ClusterShell { let bootstrap = ClientBootstrap(group: group) .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .channelOption(ChannelOptions.connectTimeout, value: settings.connectTimeout.toNIO) .channelInitializer { channel in var channelHandlers: [(String?, ChannelHandler)] = [] From 63dcb9a126b226fbf0d8ffbe26cbbd7d92582db3 Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Thu, 29 Aug 2019 18:00:28 -0700 Subject: [PATCH 2/2] Fix reconnection issue --- Sources/DistributedActors/Cluster/ClusterShell.swift | 3 +-- Sources/DistributedActors/Cluster/ClusterShellState.swift | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 21359a193..def8d0877 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -473,14 +473,13 @@ extension ClusterShell { switch initiated.onHandshakeError(error) { case .scheduleRetryHandshake(let delay): state.log.info("Schedule handshake retry to: [\(initiated.remoteNode)] delay: [\(delay)]") - _ = state.abortOutgoingHandshake(with: remoteNode, mayNotHaveChannel: true) context.timers.startSingle( key: TimerKey("handshake-timer-\(remoteNode)"), message: .command(.retryHandshake(initiated)), delay: delay ) case .giveUpOnHandshake: - if let hsmState = state.abortOutgoingHandshake(with: remoteNode, mayNotHaveChannel: true) { + if let hsmState = state.abortOutgoingHandshake(with: remoteNode) { self.notifyHandshakeFailure(state: hsmState, node: remoteNode, error: error) } } diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 7a66cfd12..93d30fe5e 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -171,14 +171,13 @@ extension ClusterShellState { /// /// - Faults: when called in wrong state of an ongoing handshake /// - Returns: if present, the (now removed) handshake state that was aborted, hil otherwise. - mutating func abortOutgoingHandshake(with node: Node, mayNotHaveChannel: Bool = false) -> HandshakeStateMachine.State? { + mutating func abortOutgoingHandshake(with node: Node) -> HandshakeStateMachine.State? { guard let state = self._handshakes.removeValue(forKey: node) else { return nil } switch state { case .initiated(let initiated): - assert(mayNotHaveChannel || initiated.channel != nil, "Channel should always be present after the initial initialization, state was: \(state)") _ = initiated.channel?.close() case .wasOfferedHandshake: fatalError("abortOutgoingHandshake was called in a context where the handshake was not an outgoing one! Was: \(state)")