diff --git a/Sources/DistributedActors/Cluster/SWIM/README.md b/Sources/DistributedActors/Cluster/SWIM/README.md deleted file mode 100644 index 3e43bae96..000000000 --- a/Sources/DistributedActors/Cluster/SWIM/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Swift Distributed Actors SWIM - -Actor based SWIM protocol implementation. diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift index eeda79e79..40a82a2b8 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift @@ -14,16 +14,16 @@ /// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol). /// -/// SWIM serves as a low-level membership and distributed failure detector mechanism. -/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level -/// cluster membership as driven by the `ClusterShell`. +/// SWIM serves as a low-level distributed failure detector mechanism. +/// It also maintains its own membership in order to monitor and select nodes to ping with periodic health checks, +/// however this membership is not directly the same as the high-level membership exposed by the `Cluster`. /// -/// ### Modifications -/// See the documentation of this swim implementation in the reference documentation. -/// -/// ### Related Papers -/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf) paper +/// SWIM is first and foremost used to determine if nodes are reachable or not (in SWIM terms if they are `.dead`), +/// however the final decision to mark a node `.dead` is made by the cluster by issuing a `Cluster.MemberStatus.down` +/// (usually in reaction to SWIM informing it about a node being `SWIM.Member.Status /// +/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level +/// cluster membership as driven by the `ClusterShell`. /// ### See Also /// - SeeAlso: `SWIM.Instance` for a detailed discussion on the implementation. /// - SeeAlso: `SWIM.Shell` for the interpretation and actor driving the interactions. @@ -46,7 +46,6 @@ public enum SWIM { case ping(lastKnownStatus: Status, replyTo: ActorRef, payload: Payload) /// "Ping Request" requests a SWIM probe. - // TODO: target -- node rather than the ref? case pingReq(target: ActorRef, lastKnownStatus: Status, replyTo: ActorRef, payload: Payload) /// Extension: Lifeguard, Local Health Aware Probe @@ -69,7 +68,6 @@ public enum SWIM { let payload: Payload } - // TODO: make sure that those are in a "testing" and not just "remote" namespace? internal struct MembershipState { let membershipState: [ActorRef: Status] } @@ -133,8 +131,9 @@ public enum SWIM { // MARK: SWIM Member Status extension SWIM { - /// The SWIM membership status reflects. + /// The SWIM membership status reflects how a node is perceived by the distributed failure detector. /// + /// ### Modification: Unreachable status /// The `.unreachable` state is set when a classic SWIM implementation would have declared a node `.down`, /// yet since we allow for the higher level membership to decide when and how to eject members from a cluster, /// only the `.unreachable` state is set and an `Cluster.ReachabilityChange` cluster event is emitted. In response to this @@ -248,13 +247,12 @@ extension SWIM.Status { // MARK: SWIM Member internal struct SWIMMember: Hashable { - // TODO: would want to swap it around, nodes are members, not actors var node: UniqueNode { return self.ref.address.node ?? self.ref._system!.settings.cluster.uniqueBindNode } /// Each (SWIM) cluster member is running a `probe` actor which we interact with when gossiping the SWIM messages. - let ref: ActorRef // TODO: better name for `ref` is it a `probeRef` (sounds right?) or `swimmerRef` (meh)? + let ref: ActorRef var status: SWIM.Status @@ -268,15 +266,19 @@ internal struct SWIMMember: Hashable { } var isAlive: Bool { - return self.status.isAlive + self.status.isAlive } var isSuspect: Bool { - return self.status.isSuspect + self.status.isSuspect + } + + var isUnreachable: Bool { + self.status.isUnreachable } var isDead: Bool { - return self.status.isDead + self.status.isDead } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance+Logging.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance+Logging.swift index 1446975ad..3604501a8 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance+Logging.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance+Logging.swift @@ -21,6 +21,7 @@ extension SWIM.Instance { /// While the SWIM.Instance is not meant to be logging by itself, it does offer metadata for loggers to use. var metadata: Logger.Metadata { [ + "swim/membersToPing": "\(self.membersToPing)", "swim/protocolPeriod": "\(self.protocolPeriod)", "swim/incarnation": "\(self.incarnation)", "swim/memberCount": "\(self.memberCount)", @@ -64,11 +65,11 @@ extension SWIMShell { case .receive(nil): return "RECV" case .receive(let .some(pinged)): - return "RECV(pinged:\(pinged.path))" + return "RECV(pinged:\(pinged.address))" case .reply(let to): - return "REPL(to:\(to.path))" + return "REPL(to:\(to.address))" case .ask(let who): - return "ASK(\(who.path))" + return "ASK(\(who.address))" } } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift index ace37dea3..c6f7374d1 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift @@ -16,25 +16,31 @@ import Logging /// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol). /// -/// Namespace containing message types used to implement the SWIM protocol. +/// Implementation of the SWIM protocol in abstract terms, see [SWIMShell] for the actor acting upon directives issued by this instance. /// /// > As you swim lazily through the milieu, /// > The secrets of the world will infect you. /// -/// - SeeAlso: Scalable Weakly-consistent Infection-style Process Group Membership Protocol -/// - SeeAlso: Lifeguard: Local Health Awareness for More Accurate Failure Detection +/// ### Modifications +/// - Random, stable order members to ping selection: Unlike the completely random selection in the original paper. +/// +/// See the reference documentation of this swim implementation in the reference documentation. +/// +/// ### Related Papers +/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf) +/// - SeeAlso: [Lifeguard: Local Health Awareness for More Accurate Failure Detection](https://arxiv.org/abs/1707.00788) final class SWIMInstance { let settings: SWIM.Settings /// Main members storage, map to values to obtain current members. private var members: [ActorRef: SWIMMember] - // private var members: [UniqueNode: SWIMMember] // FIXME: this really should talk about nodes, not the members in the keys + /// List of members maintained in random yet stable order, see `addMember` for details. + internal var membersToPing: [SWIMMember] /// Constantly mutated by `nextMemberToPing` in an effort to keep the order in which we ping nodes evenly distributed. - private var membersToPing: [SWIMMember] private var _membersToPingIndex: Int = 0 private var membersToPingIndex: Int { - return self._membersToPingIndex + self._membersToPingIndex } /// The incarnation number is used to get a sense of ordering of events, so if an `.alive` or `.suspect` @@ -43,7 +49,7 @@ final class SWIMInstance { /// be incremented by the respective node itself and will happen if that node receives a `.suspect` for /// itself, to which it will respond with an `.alive` with the incremented incarnation. var incarnation: SWIM.Incarnation { - return self._incarnation + self._incarnation } private var _incarnation: SWIM.Incarnation = 0 @@ -55,11 +61,10 @@ final class SWIMInstance { // be declared `.dead` after not receiving an `.alive` for approx. 3 seconds. private var _protocolPeriod: Int = 0 - // We need to store the path to the owning SWIMShell to avoid adding it to the `membersToPing` list - // private var myRemoteAddress: ActorAddress? = nil + // We store the owning SWIMShell ref in order avoid adding it to the `membersToPing` list private var myShellMyself: ActorRef? private var myShellAddress: ActorAddress? { - return self.myShellMyself?.address + self.myShellMyself?.address } private var myNode: UniqueNode? @@ -90,7 +95,6 @@ final class SWIMInstance { return .newerMemberAlreadyPresent(existingMember) } - // FIXME: store with node as key, not ref let member = SWIMMember(ref: ref, status: status, protocolPeriod: self.protocolPeriod) self.members[ref] = member @@ -114,11 +118,11 @@ final class SWIMInstance { self.addToGossip(member: member) - return .added + return .added(member) } enum AddMemberDirective { - case added + case added(SWIM.Member) case newerMemberAlreadyPresent(SWIM.Member) } @@ -129,7 +133,7 @@ final class SWIMInstance { /// /// - Note: /// SWIM 4.3: [...] The failure detection protocol at member works by maintaining a list (intuitively, an array) of the known - /// elements of the current membership list, and select- ing ping targets not randomly from this list, + /// elements of the current membership list, and select-ing ping targets not randomly from this list, /// but in a round-robin fashion. Instead, a newly joining member is inserted in the membership list at /// a position that is chosen uniformly at random. On completing a traversal of the entire list, /// rearranges the membership list to a random reordering. @@ -137,8 +141,8 @@ final class SWIMInstance { if self.membersToPing.isEmpty { return nil } - defer { self.advanceMembersToPingIndex() } + defer { self.advanceMembersToPingIndex() } return self.membersToPing[self.membersToPingIndex].ref } @@ -195,12 +199,25 @@ final class SWIMInstance { self.removeFromMembersToPing(member) } - return .applied(previousStatus: previousStatusOption) + return .applied(previousStatus: previousStatusOption, currentStatus: status) } enum MarkedDirective: Equatable { case ignoredDueToOlderStatus(currentStatus: SWIM.Status) - case applied(previousStatus: SWIM.Status?) + case applied(previousStatus: SWIM.Status?, currentStatus: SWIM.Status) + + /// True if the directive was `applied` and the from/to statuses differ, meaning that a change notification has issued. + var isEffectiveStatusChange: Bool { + switch self { + case .ignoredDueToOlderStatus: + return false + case .applied(nil, _): + // from no status, to any status is definitely an effective change + return true + case .applied(.some(let previousStatus), let currentStatus): + return previousStatus != currentStatus + } + } } func incrementProtocolPeriod() { @@ -441,11 +458,17 @@ extension SWIM.Instance { } else { if self.isMember(member.ref) { switch self.mark(member.ref, as: member.status) { - case .applied: - if member.status.isDead { + case .applied(_, let currentStatus): + switch currentStatus { + case .unreachable: + return .applied(level: .notice, message: "Member \(member) marked [.unreachable] from incoming gossip") + case .alive: + // TODO: could be another spot that we have to issue a reachable though? + return .ignored + case .suspect: + return .markedSuspect(member: member) + case .dead: return .confirmedDead(member: member) - } else { - return .applied } case .ignoredDueToOlderStatus(let currentStatus): return .ignored( @@ -454,9 +477,13 @@ extension SWIM.Instance { ) } } else if let remoteMemberNode = member.ref.address.node { - // TODO: store that we're now handshaking with it already? - return .connect(node: remoteMemberNode, onceConnected: { _ in - self.addMember(member.ref, status: member.status) + return .connect(node: remoteMemberNode, onceConnected: { + switch $0 { + case .success(let uniqueNode): + self.addMember(member.ref, status: member.status) + case .failure(let error): + self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!) + } }) } else { return .ignored( @@ -482,7 +509,8 @@ extension SWIM.Instance { /// and do not have a connection to it either (e.g. we joined only seed nodes, and more nodes joined them later /// we could get information through the seed nodes about the new members; but we still have never talked to them, /// thus we need to ensure we have a connection to them, before we consider adding them to the membership). - case connect(node: UniqueNode, onceConnected: (UniqueNode) -> Void) + case connect(node: UniqueNode, onceConnected: (Result) -> Void) + case markedSuspect(member: SWIM.Member) /// Meaning the node is now marked `DEAD`. /// SWIM will continue to gossip about the dead node for a while. /// We should also notify the high-level membership that the node shall be considered `DOWN`. diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift index 619fdf551..85c99f999 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift @@ -22,16 +22,16 @@ public struct SWIMSettings { .init() } - /// Optional "SWIM instance name" to be included in log statements, - /// useful when multiple instances of SWIM are run on the same process (e.g. for debugging). - public var name: String? - // var timeSource: TimeSource // TODO would be nice? public var gossip: SWIMGossipSettings = .default public var failureDetector: SWIMFailureDetectorSettings = .default + /// Optional "SWIM instance name" to be included in log statements, + /// useful when multiple instances of SWIM are run on the same node (e.g. for debugging). + internal var name: String? + /// When enabled traces _all_ incoming SWIM protocol communication (remote messages). /// These logs will contain SWIM.Instance metadata, as offered by `SWIM.Instance.metadata`. /// All logs will be prefixed using `[tracelog:SWIM]`, for easier grepping and inspecting only logs related to the SWIM instance. @@ -48,31 +48,26 @@ extension SWIM { } // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Gossip Settings +// MARK: SWIM Gossip Settings public struct SWIMGossipSettings { public static var `default`: SWIMGossipSettings { - return .init() + .init() } - /// Interval at which gossip messages should be issued. - /// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout? - public var probeInterval: TimeAmount = .seconds(1) - - // FIXME: investigate size of messages and find good default - // - // max number of messages included in any gossip payload + // TODO: investigate size of messages and find good default + /// Max number of messages included in any gossip payload public var maxNumberOfMessages: Int = 20 public var maxGossipCountPerMessage: Int = 6 } // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: FailureDetector Settings +// MARK: SWIM FailureDetector Settings public struct SWIMFailureDetectorSettings { public static var `default`: SWIMFailureDetectorSettings { - return .init() + .init() } /// Number of indirect probes that will be issued once a direct ping probe has failed to reply in time with an ack. @@ -96,6 +91,15 @@ public struct SWIMFailureDetectorSettings { public var suspicionTimeoutPeriodsMax: Int = 10 // public var suspicionTimeoutPeriodsMin: Int = 10 // FIXME: this is once we have LHA, Local Health Aware Suspicion + /// Interval at which gossip messages should be issued. + /// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout? public var probeInterval: TimeAmount = .seconds(1) + + /// Time amount after which a sent ping without ack response is considered timed-out. + /// This drives how a node becomes a suspect, by missing such ping/ack rounds. + /// + /// Note that after an initial ping/ack timeout, secondary indirect probes are issued, + /// and only after exceeding `suspicionTimeoutPeriodsMax` shall the node be declared as `.unreachable`, + /// which results in an `Cluster.MemberReachabilityChange` `Cluster.Event` which downing strategies may act upon. public var pingTimeout: TimeAmount = .milliseconds(300) } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift index dc9875aa6..bbf104b2e 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift @@ -38,13 +38,12 @@ internal struct SWIMShell { // MARK: Behaviors /// Initial behavior, kicks off timers and becomes `ready`. - // FIXME: utilize FailureObserver var behavior: Behavior { - return .setup { context in + .setup { context in // TODO: install an .cluster.down(my node) with context.defer in case we crash? Or crash system when this crashes: issue #926 - let probeInterval = self.swim.settings.gossip.probeInterval + let probeInterval = self.swim.settings.failureDetector.probeInterval context.timers.startPeriodic(key: SWIM.Shell.periodicPingKey, message: .local(.pingRandomMember), interval: probeInterval) self.swim.addMyself(context.myself, node: context.system.cluster.node) @@ -54,7 +53,7 @@ internal struct SWIMShell { } var ready: Behavior { - return .receive { context, wrappedMessage in + .receive { context, wrappedMessage in switch wrappedMessage { case .remote(let message): self.receiveRemoteMessage(context: context, message: message) @@ -151,8 +150,8 @@ internal struct SWIMShell { } // timeout is already handled by the ask, so we can set it to infinite here to not have two timeouts - context.onResultAsync(of: response, timeout: .effectivelyInfinite) { - self.handlePingResponse(context: context, result: $0, pingedMember: target, pingReqOrigin: pingReqOrigin) + context.onResultAsync(of: response, timeout: .effectivelyInfinite) { res in + self.handlePingResponse(context: context, result: res, pingedMember: target, pingReqOrigin: pingReqOrigin) return .same } } @@ -232,7 +231,7 @@ internal struct SWIMShell { } case .success(let ack): context.log.trace("Received ack from [\(ack.pinged)] with incarnation [\(ack.incarnation)] and payload [\(ack.payload)]") - self.swim.mark(ack.pinged, as: .alive(incarnation: ack.incarnation)) // TODO: log ? + self.swim.mark(ack.pinged, as: .alive(incarnation: ack.incarnation)) pingReqOrigin?.tell(ack) self.processGossipPayload(context: context, payload: ack.payload) } @@ -295,26 +294,26 @@ internal struct SWIMShell { // TODO: GC tombstones after a day switch self.swim.mark(member.ref, as: .dead) { - case .applied(let .some(previousState)): + case .applied(let .some(previousState), let currentState): if previousState.isSuspect || previousState.isUnreachable { - context.log.warning(""" - Marked [\(member)] as DEAD. \ - Was marked \(previousState) in protocol period [\(member.protocolPeriod)], current period [\(self.swim.protocolPeriod)]. - """) + context.log.warning("Marked [\(member)] as [.dead]. Was marked \(previousState) in protocol period [\(member.protocolPeriod)]", metadata: [ + "swim/protocolPeriod": "\(self.swim.protocolPeriod)", + "swim/member": "\(member)", // TODO: make sure it is the latest status of it in here + ]) } else { - context.log.warning(""" - Marked [\(member)] as DEAD. \ - Node was previously alive, and now forced DEAD. Current period [\(self.swim.protocolPeriod)]. - """) + context.log.warning("Marked [\(member)] as [.dead]. Node was previously [.alive], and now forced [.dead].", metadata: [ + "swim/protocolPeriod": "\(self.swim.protocolPeriod)", + "swim/member": "\(member)", // TODO: make sure it is the latest status of it in here + ]) } - case .applied(nil): + case .applied(nil, _): // TODO: marking is more about "marking a node as dead" should we rather log addresses and not actor paths? - context.log.warning("Marked [\(member)] as dead. Node was not previously known to SWIM.") - // TODO: add tracelog about marking a node dead here? + context.log.warning("Marked [\(member)] as [.dead]. Node was not previously known to SWIM.") + // TODO: should we not issue a escalateUnreachable here? depends how we learnt about that node... case .ignoredDueToOlderStatus: // TODO: make sure a fatal error in SWIM.Shell causes a system shutdown? - fatalError("Marking [\(member)] as DEAD failed! This should never happen, dead is the terminal status. SWIM instance: \(self.swim)") + fatalError("Marking [\(member)] as [.dead] failed! This should never happen, dead is the terminal status. SWIM instance: \(self.swim)") } } else { context.log.warning("Attempted to .confirmDead(\(node)), yet no such member known to \(self)!") // TODO: would want to see if this happens when we fail these tests @@ -329,29 +328,32 @@ internal struct SWIMShell { let timeoutPeriods = (self.swim.protocolPeriod - self.swim.settings.failureDetector.suspicionTimeoutPeriodsMax) context.log.trace("Checking suspicion timeouts...", metadata: [ "swim/suspects": "\(self.swim.suspects)", + "swim/all": "\(self.swim._allMembersDict)", "swim/protocolPeriod": "\(self.swim.protocolPeriod)", "swim/timeoutPeriods": "\(timeoutPeriods)", ]) - for member in self.swim.suspects { - context.log.trace("Checking \(member)...") - if member.protocolPeriod <= timeoutPeriods { - () // ok, continue checking - } else { + for suspect in self.swim.suspects { + context.log.trace("Checking \(suspect)...") + + // proceed with suspicion escalation to .unreachable if the timeout period has been exceeded + guard suspect.protocolPeriod <= timeoutPeriods else { continue // skip } +// guard let node = suspect.ref.address.node else { +// continue // is not a remote node +// } - if let node = member.ref.address.node { - if let incarnation = member.status.incarnation { - context.log.trace("Marking \(member.node) as .unreachable!") - self.swim.mark(member.ref, as: .unreachable(incarnation: incarnation)) - } - // if unreachable or dead, we don't need to notify the clusterRef - if member.status.isUnreachable || member.status.isDead { - continue - } - context.log.info("Notifying cluster, node \(member.node) is unreachable!") - self.clusterRef.tell(.command(.failureDetectorReachabilityChanged(node, .unreachable))) + if let incarnation = suspect.status.incarnation { + context.log.trace("Marking \(suspect.node) as .unreachable!") + self.swim.mark(suspect.ref, as: .unreachable(incarnation: incarnation)) + } + + // if the member was already unreachable or dead before, we don't need to notify the clusterRef + // (as we already did so once before, and do not want to issue multiple events about this) + if suspect.status.isUnreachable || suspect.status.isDead { + continue } + self.escalateMemberUnreachable(context: context, member: suspect) } } @@ -361,16 +363,23 @@ internal struct SWIMShell { case .membership(let members): for member in members { switch self.swim.onGossipPayload(about: member) { + case .applied where member.status.isUnreachable || member.status.isDead: + // FIXME: rather, we should return in applied if it was a change or not, only if it was we should emit... + + // TODO: ensure we don't double emit this + self.escalateMemberUnreachable(context: context, member: member) + case .applied: - () // ok, nothing to do + () case .connect(let node, let continueAddingMember): // ensuring a connection is asynchronous, but executes callback in actor context self.ensureAssociated(context, remoteNode: node) { uniqueAddressResult in switch uniqueAddressResult { case .success(let uniqueAddress): - continueAddingMember(uniqueAddress) + continueAddingMember(.success(uniqueAddress)) case .failure(let error): + continueAddingMember(.failure(error)) context.log.warning("Unable ensure association with \(node), could it have been tombstoned? Error: \(error)") } } @@ -380,9 +389,14 @@ internal struct SWIMShell { context.log.log(level: level, message) } + case .markedSuspect(let member): + context.log.info("Marked member \(member) [.suspect], if it is not found [.alive] again within \(self.settings.failureDetector.suspicionTimeoutPeriodsMax) protocol periods, it will be marked [.unreachable].") + case .confirmedDead(let member): - context.log.info("Detected [.dead] node. Information received: \(member).") - context.system.cluster.down(node: member.node.node) + context.log.warning("Detected [.dead] node: \(member.node).", metadata: [ + "swim/member": "\(member)", + ]) + context.system.cluster.down(node: member.node.node) // TODO: should w really, or rather mark unreachable and let the downing do this? } } @@ -391,6 +405,18 @@ internal struct SWIMShell { } } + private func escalateMemberUnreachable(context: ActorContext, member: SWIM.Member) { + context.log.warning( + """ + Node \(member.node) determined [.unreachable]!\ + The node is not yet marked [.down], a downing strategy or other Cluster.Event subscriber may act upon this information. + """, metadata: [ + "swim/member": "\(member)", + ] + ) + self.clusterRef.tell(.command(.failureDetectorReachabilityChanged(member.node, .unreachable))) + } + /// Use to ensure an association to given remote node exists; as one may not always be sure a connection has been already established, /// when a remote ref is discovered through other means (such as SWIM's gossiping). func ensureAssociated(_ context: ActorContext, remoteNode: UniqueNode?, continueWithAssociation: @escaping (Result) -> Void) { @@ -454,6 +480,14 @@ internal struct SWIMShell { let resolveContext = ResolveContext(address: remoteSwimAddress, system: context.system) let remoteSwimRef = context.system._resolve(context: resolveContext) + // We need to include the member immediately, rather than when we have ensured the association. + // This is because if we're not able to establish the association, we still want to re-try soon (in the next ping round), + // and perhaps then the other node would accept the association (perhaps some transient network issues occurred OR the node was + // already dead when we first try to ping it). In those situations, we need to continue the protocol until we're certain it is + // suspect and unreachable, as without signalling unreachable the high-level membership would not have a chance to notice and + // call the node [Cluster.MemberStatus.down]. + self.swim.addMember(remoteSwimRef, status: .alive(incarnation: 0)) + // TODO: we are sending the ping here to initiate cluster membership. Once available this should do a state sync instead self.sendPing(context: context, to: remoteSwimRef, lastKnownStatus: .alive(incarnation: 0), pingReqOrigin: nil) } diff --git a/Sources/DistributedActorsTestKit/ActorTestKit.swift b/Sources/DistributedActorsTestKit/ActorTestKit.swift index c7045dce8..97ff7e190 100644 --- a/Sources/DistributedActorsTestKit/ActorTestKit.swift +++ b/Sources/DistributedActorsTestKit/ActorTestKit.swift @@ -26,7 +26,7 @@ import XCTest /// The `ActorTestKit` offers a number of helpers such as test probes and helper functions to /// make testing actor based "from the outside" code manageable and pleasant. public final class ActorTestKit { - internal let system: ActorSystem + public let system: ActorSystem private let spawnProbesLock = Lock() /// Access should be protected by `spawnProbesLock`, in order to guarantee unique names. @@ -114,6 +114,15 @@ extension ActorTestKit { return try system.spawn(.init(unchecked: .unique(name)), props: testProbeProps, probeBehavior) }, settings: self.settings) } + + /// Spawns an `ActorTestProbe` and immediately subscribes it to the passed in event stream. + /// + /// - Hint: Use `fishForMessages` and `fishFor` to filter expectations for specific events. + public func spawnTestProbe(subscribedTo eventStream: EventStream, file: String = #file, line: UInt = #line, column: UInt = #column) -> ActorTestProbe { + let p = self.spawnTestProbe(.prefixed(with: "\(eventStream.ref.path.name)-subscriberProbe"), expecting: Event.self) + eventStream.subscribe(p.ref) + return p + } } /// A test probe pretends to be the `Actorable` and allows expecting messages be sent to it. diff --git a/Sources/DistributedActorsTestKit/TestProbes.swift b/Sources/DistributedActorsTestKit/TestProbes.swift index 60e93ec82..619edb908 100644 --- a/Sources/DistributedActorsTestKit/TestProbes.swift +++ b/Sources/DistributedActorsTestKit/TestProbes.swift @@ -64,7 +64,8 @@ public class ActorTestProbe { /// Blocking linked queue, specialized for keeping only termination signals (so that we can assert terminations, independently of other signals) private let terminationsQueue = LinkedBlockingQueue() - private var lastMessageObserved: Message? + /// Last message received (by using an `expect...` call), by this probe. + public var lastMessage: Message? /// Prepares and spawns a new test probe. Users should use `testKit.spawnTestProbe(...)` instead. internal init( @@ -188,7 +189,7 @@ extension ActorTestProbe { guard let message = self.messagesQueue.poll(deadline.timeLeft) else { continue } - self.lastMessageObserved = message + self.lastMessage = message return message } @@ -283,7 +284,7 @@ extension ActorTestProbe { /// /// - Warning: Blocks the current thread until the `expectationTimeout` is exceeded or a message is received by the actor. public func maybeExpectMessage() throws -> Message? { - return try self.maybeExpectMessage(within: self.expectationTimeout) + try self.maybeExpectMessage(within: self.expectationTimeout) } /// Expects a message to "maybe" arrive at the `ActorTestProbe` and returns it for further assertions, @@ -309,7 +310,7 @@ extension ActorTestProbe { guard let message = self.messagesQueue.poll(deadline.timeLeft) else { continue } - self.lastMessageObserved = message + self.lastMessage = message return message } @@ -338,7 +339,7 @@ extension ActorTestProbe where Message: Equatable { let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function) do { let receivedMessage = try self.receiveMessage(within: timeout) - self.lastMessageObserved = receivedMessage + self.lastMessage = receivedMessage guard receivedMessage == message else { throw callSite.error(callSite.detailedMessage(got: receivedMessage, expected: message)) } @@ -356,7 +357,7 @@ extension ActorTestProbe where Message: Equatable { let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function) let receivedMessage = try self.receiveMessage(within: timeout) - self.lastMessageObserved = receivedMessage + self.lastMessage = receivedMessage guard receivedMessage is T else { throw callSite.error(callSite.detailedMessage(got: receivedMessage, expected: type)) } @@ -392,7 +393,7 @@ extension ActorTestProbe { do { let message = try self.receiveMessage(within: deadline.timeLeft) receivedMessages.append(message) - self.lastMessageObserved = message + self.lastMessage = message if receivedMessages.count == count { return receivedMessages @@ -427,7 +428,7 @@ extension ActorTestProbe where Message: Equatable { while !messages.isEmpty { let receivedMessage = try self.receiveMessage(within: deadline.timeLeft) - self.lastMessageObserved = receivedMessage + self.lastMessage = receivedMessage guard let index = messages.firstIndex(where: { $0 == receivedMessage }) else { throw callSite.error("Received unexpected message [\(receivedMessage)]") } @@ -539,7 +540,7 @@ extension ActorTestProbe { var fullMessage: String = message ?? "ActorTestProbe failure." - switch self.lastMessageObserved { + switch self.lastMessage { case .some(let m): fullMessage += " Last message observed was: [\(m)]." case .none: diff --git a/Tests/DistributedActorsTestKitTests/ActorTestKitTests+XCTest.swift b/Tests/DistributedActorsTestKitTests/ActorTestKitTests+XCTest.swift index 6d7880068..16b9618d3 100644 --- a/Tests/DistributedActorsTestKitTests/ActorTestKitTests+XCTest.swift +++ b/Tests/DistributedActorsTestKitTests/ActorTestKitTests+XCTest.swift @@ -27,7 +27,6 @@ extension ActorTestKitTests { ("test_error_withMessage", test_error_withMessage), ("test_fail_shouldNotImmediatelyFailWithinEventuallyBlock", test_fail_shouldNotImmediatelyFailWithinEventuallyBlock), ("test_nestedEventually_shouldProperlyHandleFailures", test_nestedEventually_shouldProperlyHandleFailures), - ("test_ensureRegistered_countAndRefs", test_ensureRegistered_countAndRefs), ("test_fishForMessages", test_fishForMessages), ("test_fishForTransformed", test_fishForTransformed), ("test_fishFor_canThrow", test_fishFor_canThrow), diff --git a/Tests/DistributedActorsTestKitTests/ActorTestKitTests.swift b/Tests/DistributedActorsTestKitTests/ActorTestKitTests.swift index 56b4be89c..d66191a75 100644 --- a/Tests/DistributedActorsTestKitTests/ActorTestKitTests.swift +++ b/Tests/DistributedActorsTestKitTests/ActorTestKitTests.swift @@ -75,17 +75,6 @@ final class ActorTestKitTests: XCTestCase { } } - func test_ensureRegistered_countAndRefs() throws { - let greeterProbe1 = self.testKit.spawnTestProbe(expecting: String.self) - let greeterProbe2 = self.testKit.spawnTestProbe(expecting: String.self) - - let key = Receptionist.RegistrationKey(String.self, id: "greeter") - self.system.receptionist.tell(Receptionist.Register(greeterProbe1.ref, key: key)) - self.system.receptionist.tell(Receptionist.Register(greeterProbe2.ref, key: key)) - - try self.testKit.ensureRegistered(key: key, expectedCount: 2, expectedRefs: [greeterProbe2.ref, greeterProbe1.ref]) - } - func test_fishForMessages() throws { let p = self.testKit.spawnTestProbe(expecting: String.self) diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceClusteredTests.swift index ecea78fb3..0be6a3a57 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceClusteredTests.swift @@ -58,7 +58,7 @@ final class SWIMInstanceClusteredTests: ClusteredNodesTestBase { swim.memberCount.shouldEqual(1) // the new member should not yet be added until we can confirm we are able to connect // we act as if we connected successfully - onceConnected(remote.settings.cluster.uniqueBindNode) + onceConnected(.success(remote.settings.cluster.uniqueBindNode)) swim.memberCount.shouldEqual(2) // successfully joined swim.member(for: remoteShell)!.status.shouldEqual(remoteMember.status) diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift index 509ece519..4204d8e77 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift @@ -519,7 +519,7 @@ final class SWIMInstanceTests: ActorSystemTestBase { self.validateSuspects(swim, expected: []) - swim.mark(p1, as: .suspect(incarnation: 0)).shouldEqual(.applied(previousStatus: aliveAtZero)) + swim.mark(p1, as: .suspect(incarnation: 0)).shouldEqual(.applied(previousStatus: aliveAtZero, currentStatus: .suspect(incarnation: 0))) self.validateSuspects(swim, expected: [p1]) _ = swim.mark(p3, as: .suspect(incarnation: 0)) diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests+XCTest.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift similarity index 51% rename from Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests+XCTest.swift rename to Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift index ef1f87fb1..1bcfea14c 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests+XCTest.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift @@ -20,19 +20,21 @@ import XCTest /// Do NOT edit this file directly as it will be regenerated automatically when needed. /// -extension SWIMShellTests { - static var allTests: [(String, (SWIMShellTests) -> () throws -> Void)] { +extension SWIMShellClusteredTests { + static var allTests: [(String, (SWIMShellClusteredTests) -> () throws -> Void)] { return [ ("test_swim_shouldRespondWithAckToPing", test_swim_shouldRespondWithAckToPing), ("test_swim_shouldPingRandomMember", test_swim_shouldPingRandomMember), ("test_swim_shouldPingSpecificMemberWhenRequested", test_swim_shouldPingSpecificMemberWhenRequested), - ("test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndNoOtherNodesCanBeRequested", test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndNoOtherNodesCanBeRequested), - ("test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndRequestedNodesFailToPing", test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndRequestedNodesFailToPing), - ("test_swim_shouldNotMarkMembersAsSuspectWhenPingFailsButRequestedNodesSucceedToPing", test_swim_shouldNotMarkMembersAsSuspectWhenPingFailsButRequestedNodesSucceedToPing), - ("test_swim_shouldMarkSuspectedMembersAsAliveWhenPingingSucceedsWithinSuspicionTimeout", test_swim_shouldMarkSuspectedMembersAsAliveWhenPingingSucceedsWithinSuspicionTimeout), - ("test_swim_shouldNotifyClusterAboutUnreachableNodeAfterConfiguredSuspicionTimeoutAndMarkDeadWhenConfirmed", test_swim_shouldNotifyClusterAboutUnreachableNodeAfterConfiguredSuspicionTimeoutAndMarkDeadWhenConfirmed), + ("test_swim_shouldMarkSuspects_whenPingFailsAndNoOtherNodesCanBeRequested", test_swim_shouldMarkSuspects_whenPingFailsAndNoOtherNodesCanBeRequested), + ("test_swim_shouldMarkSuspects_whenPingFailsAndRequestedNodesFailToPing", test_swim_shouldMarkSuspects_whenPingFailsAndRequestedNodesFailToPing), + ("test_swim_shouldNotMarkSuspects_whenPingFailsButRequestedNodesSucceedToPing", test_swim_shouldNotMarkSuspects_whenPingFailsButRequestedNodesSucceedToPing), + ("test_swim_shouldMarkSuspectedMembersAsAlive_whenPingingSucceedsWithinSuspicionTimeout", test_swim_shouldMarkSuspectedMembersAsAlive_whenPingingSucceedsWithinSuspicionTimeout), + ("test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed", test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed), + ("test_swim_shouldNotifyClusterAboutUnreachableNode_whenUnreachableDiscoveredByOtherNode", test_swim_shouldNotifyClusterAboutUnreachableNode_whenUnreachableDiscoveredByOtherNode), + ("test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain", test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain), ("test_swim_shouldSendGossipInAck", test_swim_shouldSendGossipInAck), - ("test_swim_shouldSendGossipInPing_", test_swim_shouldSendGossipInPing_), + ("test_swim_shouldSendGossipInPing", test_swim_shouldSendGossipInPing), ("test_swim_shouldSendGossipInPingReq", test_swim_shouldSendGossipInPingReq), ("test_swim_shouldSendGossipOnlyTheConfiguredNumberOfTimes", test_swim_shouldSendGossipOnlyTheConfiguredNumberOfTimes), ("test_swim_shouldConvergeStateThroughGossip", test_swim_shouldConvergeStateThroughGossip), diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift similarity index 79% rename from Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests.swift rename to Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift index 30afa8dd1..7ecab3ba2 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift @@ -17,7 +17,7 @@ import DistributedActorsTestKit import Foundation import XCTest -final class SWIMShellTests: ClusteredNodesTestBase { +final class SWIMShellClusteredTests: ClusteredNodesTestBase { var firstClusterProbe: ActorTestProbe! var secondClusterProbe: ActorTestProbe! @@ -33,6 +33,12 @@ final class SWIMShellTests: ClusteredNodesTestBase { return second } + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.filterActorPath = "/system/cluster/swim" + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + func test_swim_shouldRespondWithAckToPing() throws { let first = self.setUpFirst() let p = self.testKit(first).spawnTestProbe(expecting: SWIM.Ack.self) @@ -100,7 +106,10 @@ final class SWIMShellTests: ClusteredNodesTestBase { response.incarnation.shouldEqual(0) } - func test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndNoOtherNodesCanBeRequested() throws { + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Marking suspect nodes + + func test_swim_shouldMarkSuspects_whenPingFailsAndNoOtherNodesCanBeRequested() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -119,7 +128,7 @@ final class SWIMShellTests: ClusteredNodesTestBase { try self.awaitStatus(.suspect(incarnation: 0), for: remoteProbeRef, on: ref, within: .seconds(1)) } - func test_swim_shouldMarkMembersAsSuspectWhenPingFailsAndRequestedNodesFailToPing() throws { + func test_swim_shouldMarkSuspects_whenPingFailsAndRequestedNodesFailToPing() throws { let first = self.setUpFirst() let probe = self.testKit(first).spawnTestProbe(expecting: ForwardedSWIMMessage.self) @@ -149,7 +158,7 @@ final class SWIMShellTests: ClusteredNodesTestBase { try self.awaitStatus(.suspect(incarnation: 0), for: suspiciousRef, on: ref, within: .seconds(1)) } - func test_swim_shouldNotMarkMembersAsSuspectWhenPingFailsButRequestedNodesSucceedToPing() throws { + func test_swim_shouldNotMarkSuspects_whenPingFailsButRequestedNodesSucceedToPing() throws { let first = self.setUpFirst() let probe = self.testKit(first).spawnTestProbe(expecting: ForwardedSWIMMessage.self) @@ -180,7 +189,7 @@ final class SWIMShellTests: ClusteredNodesTestBase { try self.holdStatus(.alive(incarnation: 0), for: suspiciousRef, on: ref, within: .seconds(1)) } - func test_swim_shouldMarkSuspectedMembersAsAliveWhenPingingSucceedsWithinSuspicionTimeout() throws { + func test_swim_shouldMarkSuspectedMembersAsAlive_whenPingingSucceedsWithinSuspicionTimeout() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -204,7 +213,7 @@ final class SWIMShellTests: ClusteredNodesTestBase { try self.awaitStatus(.alive(incarnation: 1), for: remoteProbeRef, on: ref, within: .seconds(1)) } - func test_swim_shouldNotifyClusterAboutUnreachableNodeAfterConfiguredSuspicionTimeoutAndMarkDeadWhenConfirmed() throws { + func test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -241,6 +250,99 @@ final class SWIMShellTests: ClusteredNodesTestBase { try self.awaitStatus(.dead, for: remoteMemberRef, on: ref, within: .seconds(1)) } + func test_swim_shouldNotifyClusterAboutUnreachableNode_whenUnreachableDiscoveredByOtherNode() throws { + let first = self.setUpFirst { settings in + // purposefully too large timeouts, we want the first node to be informed by the third node + // about the second node being unreachable/dead, and ensure that the first node also signals an + // unreachability event to the cluster upon such discovery. + settings.cluster.swim.failureDetector.suspicionTimeoutPeriodsMax = 100 + settings.cluster.swim.failureDetector.pingTimeout = .seconds(3) + } + let second = self.setUpSecond() + let secondNode = second.cluster.node + let third = self.setUpNode("third") { settings in + settings.cluster.swim.failureDetector.suspicionTimeoutPeriodsMax = 2 + settings.cluster.swim.failureDetector.pingTimeout = .milliseconds(300) + } + + first.cluster.join(node: second.cluster.node.node) + third.cluster.join(node: second.cluster.node.node) + try assertAssociated(first, withExactly: [second.cluster.node, third.cluster.node]) + try assertAssociated(second, withExactly: [first.cluster.node, third.cluster.node]) + try assertAssociated(third, withExactly: [first.cluster.node, second.cluster.node]) + + let firstTestKit = self.testKit(first) + let p1 = firstTestKit.spawnTestProbe(expecting: Cluster.Event.self) + first.cluster.events.subscribe(p1.ref) + + let thirdTestKit = self.testKit(third) + let p3 = thirdTestKit.spawnTestProbe(expecting: Cluster.Event.self) + third.cluster.events.subscribe(p3.ref) + + try self.expectReachabilityInSnapshot(firstTestKit, node: secondNode, expect: .reachable) + try self.expectReachabilityInSnapshot(thirdTestKit, node: secondNode, expect: .reachable) + + // kill the second node + second.shutdown() + + try self.expectReachabilityEvent(thirdTestKit, p3, node: secondNode, expect: .unreachable) + try self.expectReachabilityEvent(firstTestKit, p1, node: secondNode, expect: .unreachable) + + // we also expect the snapshot to include the right reachability information now + try self.expectReachabilityInSnapshot(firstTestKit, node: secondNode, expect: .unreachable) + try self.expectReachabilityInSnapshot(thirdTestKit, node: secondNode, expect: .unreachable) + } + + func test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain() throws { + // TODO: Implement this once the transport is pluggable and we can make it drop random messages + pnote("TODO: Implement this once the transport is pluggable and we can make it drop random messages") + } + + /// Passed in `eventStreamProbe` is expected to have been subscribed to the event stream as early as possible, + /// as we want to expect the specific reachability event to be sent + private func expectReachabilityEvent( + _ testKit: ActorTestKit, _ eventStreamProbe: ActorTestProbe, + node uniqueNode: UniqueNode, expect expected: Cluster.MemberReachability + ) throws { + let messages = try eventStreamProbe.fishFor(Cluster.ReachabilityChange.self, within: .seconds(10)) { event in + switch event { + case .reachabilityChange(let change): + return .catchComplete(change) + default: + return .ignore + } + } + messages.count.shouldEqual(1) + guard let change: Cluster.ReachabilityChange = messages.first else { + throw testKit.fail("Expected a reachability change, but did not get one on \(testKit.system.cluster.node)") + } + change.member.node.shouldEqual(uniqueNode) + change.member.reachability.shouldEqual(expected) + } + + private func expectReachabilityInSnapshot(_ testKit: ActorTestKit, node: UniqueNode, expect expected: Cluster.MemberReachability) throws { + try testKit.eventually(within: .seconds(3)) { + let p11 = testKit.spawnTestProbe(subscribedTo: testKit.system.cluster.events) + guard case .some(Cluster.Event.snapshot(let snapshot)) = try p11.maybeExpectMessage() else { + throw testKit.error("Expected snapshot, was: \(String(reflecting: p11.lastMessage))") + } + + if let secondMember = snapshot.uniqueMember(node) { + if secondMember.reachability == expected { + return + } else { + throw testKit.error("Expected \(node) on \(testKit.system.cluster.node) to be [\(expected)] but was: \(secondMember)") + } + } else { + pinfo("Unable to assert reachability of \(node) on \(testKit.system.cluster.node) since membership did not contain it. Was: \(snapshot)") + () // it may have technically been removed already, so this is "fine" + } + } + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Gossiping + func test_swim_shouldSendGossipInAck() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -272,7 +374,7 @@ final class SWIMShellTests: ClusteredNodesTestBase { } } - func test_swim_shouldSendGossipInPing_() throws { + func test_swim_shouldSendGossipInPing() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -451,7 +553,8 @@ final class SWIMShellTests: ClusteredNodesTestBase { func expectPing( on probe: ActorTestProbe, reply: Bool, incarnation: SWIM.Incarnation = 0, file: StaticString = #file, line: UInt = #line, column: UInt = #column, - assertPayload: (SWIM.Payload) throws -> Void = { _ in } + assertPayload: (SWIM.Payload) throws -> Void = { _ in + } ) throws { switch try probe.expectMessage(file: file, line: line, column: column) { case .remote(.ping(_, let replyTo, let payload)): @@ -468,7 +571,8 @@ final class SWIMShellTests: ClusteredNodesTestBase { for: ActorRef, on probe: ActorTestProbe, reply: Bool, incarnation: SWIM.Incarnation = 0, file: StaticString = #file, line: UInt = #line, column: UInt = #column, - assertPayload: (SWIM.Payload) throws -> Void = { _ in } + assertPayload: (SWIM.Payload) throws -> Void = { _ in + } ) throws { switch try probe.expectMessage(file: file, line: line, column: column) { case .remote(.pingReq(let toPing, _, let replyTo, let payload)): @@ -518,7 +622,8 @@ final class SWIMShellTests: ClusteredNodesTestBase { } } - func makeSWIM(for address: ActorAddress, members: [ActorRef], configuredWith configure: (inout SWIM.Settings) -> Void = { _ in }) -> SWIM.Instance { + func makeSWIM(for address: ActorAddress, members: [ActorRef], configuredWith configure: (inout SWIM.Settings) -> Void = { _ in + }) -> SWIM.Instance { var memberStatus: [ActorRef: SWIM.Status] = [:] for member in members { memberStatus[member] = .alive(incarnation: 0) @@ -526,7 +631,8 @@ final class SWIMShellTests: ClusteredNodesTestBase { return self.makeSWIM(for: address, members: memberStatus, configuredWith: configure) } - func makeSWIM(for address: ActorAddress, members: [ActorRef: SWIM.Status], configuredWith configure: (inout SWIM.Settings) -> Void = { _ in }) -> SWIM.Instance { + func makeSWIM(for address: ActorAddress, members: [ActorRef: SWIM.Status], configuredWith configure: (inout SWIM.Settings) -> Void = { _ in + }) -> SWIM.Instance { var settings = SWIM.Settings() configure(&settings) let instance = SWIM.Instance(settings) @@ -536,7 +642,8 @@ final class SWIMShellTests: ClusteredNodesTestBase { return instance } - func swimBehavior(members: [ActorRef], clusterRef: ClusterShell.Ref, configuredWith configure: @escaping (inout SWIM.Settings) -> Void = { _ in }) -> Behavior { + func swimBehavior(members: [ActorRef], clusterRef: ClusterShell.Ref, configuredWith configure: @escaping (inout SWIM.Settings) -> Void = { _ in + }) -> Behavior { return .setup { context in let swim = self.makeSWIM(for: context.address, members: members, configuredWith: configure) swim.addMyself(context.myself) diff --git a/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift b/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift index 50a273355..1e1dc78e9 100644 --- a/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift +++ b/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift @@ -21,10 +21,10 @@ final class NodeDeathWatcherTests: ClusteredNodesTestBase { func test_nodeDeath_shouldFailAllRefsOnSpecificAddress() throws { try shouldNotThrow { let first = self.setUpNode("first") { settings in - settings.cluster.swim.gossip.probeInterval = .milliseconds(100) + settings.cluster.swim.failureDetector.probeInterval = .milliseconds(100) } let second = self.setUpNode("second") { settings in - settings.cluster.swim.gossip.probeInterval = .milliseconds(100) + settings.cluster.swim.failureDetector.probeInterval = .milliseconds(100) } try self.joinNodes(node: first, with: second) diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 17144ca43..784cc2ce6 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -109,7 +109,7 @@ XCTMain([ testCase(SWIMInstanceClusteredTests.allTests), testCase(SWIMInstanceTests.allTests), testCase(SWIMSerializationTests.allTests), - testCase(SWIMShellTests.allTests), + testCase(SWIMShellClusteredTests.allTests), testCase(SerializationPoolTests.allTests), testCase(SerializationTests.allTests), testCase(ShootTheOtherNodeClusteredTests.allTests),