From 8bed7c9fa91ad9373b33e2b39a305fbda181f112 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Mon, 27 Jan 2020 17:09:29 +0900 Subject: [PATCH 1/3] =swim allow disabling the SWIM failure detector, e.g. if some other impl is provided --- .../Cluster/ClusterShell.swift | 13 +- .../DistributedActors/Cluster/SWIM/SWIM.swift | 4 +- .../Cluster/SWIM/SWIMInstance.swift | 256 +++++++++++------- .../Cluster/SWIM/SWIMSettings.swift | 6 + .../Cluster/SWIM/SWIMShell.swift | 146 ++++++---- .../SWIM/SWIMInstanceTests+XCTest.swift | 1 + .../Cluster/SWIM/SWIMInstanceTests.swift | 81 +++++- .../SWIM/SWIMShellClusteredTests.swift | 8 +- 8 files changed, 336 insertions(+), 179 deletions(-) diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index 7979116c1..b9e5dc03b 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -336,8 +336,17 @@ extension ClusterShell { let uniqueBindAddress = clusterSettings.uniqueBindNode // SWIM failure detector and gossiping - let swimBehavior = SWIMShell(settings: clusterSettings.swim, clusterRef: context.myself).behavior - self._swimRef = try context._downcastUnsafe._spawn(SWIMShell.naming, props: ._wellKnown, swimBehavior) + if !clusterSettings.swim.disabled { + let swimBehavior = SWIMShell(settings: clusterSettings.swim, clusterRef: context.myself).behavior + self._swimRef = try context._downcastUnsafe._spawn(SWIMShell.naming, props: ._wellKnown, swimBehavior) + } else { + context.log.warning(""" + SWIM Failure Detector has been [disabled]! \ + Reachability events will NOT be emitted, meaning that most downing strategies will not be able to perform \ + their duties. Please ensure that an external mechanism for detecting failed cluster nodes is used. + """) + self._swimRef = nil + } // automatic leader election, so it may move members: .joining -> .up (and other `LeaderAction`s) if let leaderElection = context.system.settings.cluster.autoLeaderElection.make(context.system.cluster.settings) { diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift index 40a82a2b8..e1c5cda00 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift @@ -39,7 +39,7 @@ public enum SWIM { internal enum Message { case remote(RemoteMessage) case local(LocalMessage) - case testing(TestingMessage) // TODO: hopefully no need for this soon once cluster events land + case _testing(TestingMessage) } internal enum RemoteMessage { @@ -48,7 +48,7 @@ public enum SWIM { /// "Ping Request" requests a SWIM probe. case pingReq(target: ActorRef, lastKnownStatus: Status, replyTo: ActorRef, payload: Payload) - /// Extension: Lifeguard, Local Health Aware Probe + // TODO: Implement Extension: Lifeguard, Local Health Aware Probe /// LHAProbe adds a `nack` message to the fault detector protocol, /// which is sent in the case of failed indirect probes. This gives the member that /// initiates the indirect probe a way to check if it is receiving timely responses diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift index c6f7374d1..1f92af16e 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift @@ -205,19 +205,6 @@ final class SWIMInstance { enum MarkedDirective: Equatable { case ignoredDueToOlderStatus(currentStatus: SWIM.Status) case applied(previousStatus: SWIM.Status?, currentStatus: SWIM.Status) - - /// True if the directive was `applied` and the from/to statuses differ, meaning that a change notification has issued. - var isEffectiveStatusChange: Bool { - switch self { - case .ignoredDueToOlderStatus: - return false - case .applied(nil, _): - // from no status, to any status is definitely an effective change - return true - case .applied(.some(let previousStatus), let currentStatus): - return previousStatus != currentStatus - } - } } func incrementProtocolPeriod() { @@ -254,7 +241,7 @@ final class SWIMInstance { } func member(for ref: ActorRef) -> SWIM.Member? { - return self.members[ref] + self.members[ref] } func member(for node: UniqueNode) -> SWIM.Member? { @@ -267,12 +254,12 @@ final class SWIMInstance { /// Counts non-dead members. var memberCount: Int { - return self.members.filter { !$0.value.isDead }.count + self.members.filter { !$0.value.isDead }.count } // for testing; used to implement the data for the testing message in the shell: .getMembershipState var _allMembersDict: [ActorRef: SWIM.Status] { - return self.members.mapValues { $0.status } + self.members.mapValues { $0.status } } /// Lists all suspect members, including myself if suspect. @@ -414,92 +401,114 @@ extension SWIM.Instance { func onGossipPayload(about member: SWIM.Member) -> OnGossipPayloadDirective { if self.isMyself(member) { - switch member.status { - case .alive: - // as long as other nodes see us as alive, we're happy - return .applied - case .suspect(let suspectedInIncarnation): - // someone suspected us, so we need to increment our incarnation number to spread our alive status with - // the incremented incarnation - if suspectedInIncarnation == self.incarnation { - self._incarnation += 1 - } else if suspectedInIncarnation > self.incarnation { - return .applied( - level: .warning, - message: """ - Received gossip about self with incarnation number [\(suspectedInIncarnation)] > current incarnation [\(self._incarnation)], \ - which should never happen and while harmless is highly suspicious, please raise an issue with logs. This MAY be an issue in the library. - """ - ) - } - return .applied - - case .unreachable(let unreachableInIncarnation): - // someone suspected us, so we need to increment our - // incarnation number to spread our alive status with - // the incremented incarnation - if unreachableInIncarnation == self.incarnation { - self._incarnation += 1 - } else if unreachableInIncarnation > self.incarnation { - return .applied( - level: .warning, - message: """ - Received gossip about self with incarnation number [\(unreachableInIncarnation)] > current incarnation [\(self._incarnation)], \ - which should never happen and while harmless is highly suspicious, please raise an issue with logs. This MAY be an issue in the library. - """ - ) - } + return onMyselfGossipPayload(myself: member) + } else { + return onOtherMemberGossipPayload(member: member) + } + } - return .applied + private func onMyselfGossipPayload(myself incoming: SWIM.Member) -> SWIM.Instance.OnGossipPayloadDirective { + assert(self.myShellMyself == incoming.ref, "Attempted to process gossip as-if about myself, but was not the same ref, was: \(incoming). Myself: \(self.myShellMyself)") - case .dead: - return .confirmedDead(member: member) - } - } else { - if self.isMember(member.ref) { - switch self.mark(member.ref, as: member.status) { - case .applied(_, let currentStatus): - switch currentStatus { - case .unreachable: - return .applied(level: .notice, message: "Member \(member) marked [.unreachable] from incoming gossip") - case .alive: - // TODO: could be another spot that we have to issue a reachable though? - return .ignored - case .suspect: - return .markedSuspect(member: member) - case .dead: - return .confirmedDead(member: member) - } - case .ignoredDueToOlderStatus(let currentStatus): - return .ignored( - level: .trace, - message: "Ignoring gossip about member \(reflecting: member.node), incoming: [\(member.status)] does not supersede current: [\(currentStatus)]" - ) - } - } else if let remoteMemberNode = member.ref.address.node { - return .connect(node: remoteMemberNode, onceConnected: { - switch $0 { - case .success(let uniqueNode): - self.addMember(member.ref, status: member.status) - case .failure(let error): - self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!) - } - }) + // Note, we don't yield changes for myself node observations, thus the self node will never be reported as unreachable, + // after all, we can always reach ourselves. We may reconsider this if we wanted to allow SWIM to inform us about + // the fact that many other nodes think we're unreachable, and thus we could perform self-downing based upon this information // TODO: explore self-downing driven from SWIM + + switch incoming.status { + case .alive: + // as long as other nodes see us as alive, we're happy + return .applied(change: nil) + case .suspect(let suspectedInIncarnation): + // someone suspected us, so we need to increment our incarnation number to spread our alive status with + // the incremented incarnation + if suspectedInIncarnation == self.incarnation { + self._incarnation += 1 + return .applied(change: nil) + } else if suspectedInIncarnation > self.incarnation { + return .applied( + change: nil, + level: .warning, + message: """ + Received gossip about self with incarnation number [\(suspectedInIncarnation)] > current incarnation [\(self._incarnation)], \ + which should never happen and while harmless is highly suspicious, please raise an issue with logs. This MAY be an issue in the library. + """ + ) } else { - return .ignored( + // incoming incarnation was < than current one, i.e. the incoming information is "old" thus we discard it + return .ignored + } + + case .unreachable(let unreachableInIncarnation): + // someone suspected us, so we need to increment our incarnation number to spread our alive status with + // the incremented incarnation + // TODO: this could be the right spot to reply with a .nack, to prove that we're still alive + if unreachableInIncarnation == self.incarnation { + self._incarnation += 1 + } else if unreachableInIncarnation > self.incarnation { + return .applied( + change: nil, level: .warning, message: """ - Received gossip about node which is neither myself or a remote node (i.e. address is not present)\ - which is highly unexpected and may indicate a configuration or networking issue. Ignoring gossip about this member. \ - Member: \(member), SWIM.Instance state: \(String(reflecting: self)) + Received gossip about self with incarnation number [\(unreachableInIncarnation)] > current incarnation [\(self._incarnation)], \ + which should never happen and while harmless is highly suspicious, please raise an issue with logs. This MAY be an issue in the library. """ ) } + + return .applied(change: nil) + + case .dead: + guard let myselfRef = self.myShellMyself, + var myselfMember = self.member(for: myselfRef) else { + return .applied(change: nil) + } + + myselfMember.status = .dead + switch self.mark(myselfRef, as: .dead) { + case .applied(.some(let previousStatus), _): + return .applied(change: .init(fromStatus: previousStatus, member: myselfMember)) + default: + return .ignored(level: .warning, message: "Self already marked .dead") + } + } + } + + private func onOtherMemberGossipPayload(member: SWIM.Member) -> SWIM.Instance.OnGossipPayloadDirective { + assert(self.myShellMyself != member.ref, "Attempted to process gossip as-if not-myself, but WAS same ref, was: \(member). Myself: \(self.myShellMyself)") + + if self.isMember(member.ref) { + switch self.mark(member.ref, as: member.status) { + case .applied(let previousStatus, _): + return .applied(change: .init(fromStatus: previousStatus, member: member)) + case .ignoredDueToOlderStatus(let currentStatus): + return .ignored( + level: .trace, + message: "Ignoring gossip about member \(reflecting: member.node), incoming: [\(member.status)] does not supersede current: [\(currentStatus)]" + ) + } + } else if let remoteMemberNode = member.ref.address.node { + return .connect(node: remoteMemberNode, onceConnected: { + switch $0 { + case .success(let uniqueNode): + self.addMember(member.ref, status: member.status) + case .failure: + self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!) + } + }) + } else { + return .ignored( + level: .warning, + message: """ + Received gossip about node which is neither myself or a remote node (i.e. address is not present)\ + which is highly unexpected and may indicate a configuration or networking issue. Ignoring gossip about this member. \ + Member: \(member), SWIM.Instance state: \(String(reflecting: self)) + """ + ) } } enum OnGossipPayloadDirective { - case applied(level: Logger.Level?, message: Logger.Message?) + case applied(change: MemberStatusChange?, level: Logger.Level?, message: Logger.Message?) /// Ignoring a gossip update is perfectly fine: it may be "too old" or other reasons case ignored(level: Logger.Level?, message: Logger.Message?) /// Warning! Even though we have an `UniqueNode` here, we need to ensure that we are actually connected to the node, @@ -510,21 +519,70 @@ extension SWIM.Instance { /// we could get information through the seed nodes about the new members; but we still have never talked to them, /// thus we need to ensure we have a connection to them, before we consider adding them to the membership). case connect(node: UniqueNode, onceConnected: (Result) -> Void) - case markedSuspect(member: SWIM.Member) - /// Meaning the node is now marked `DEAD`. - /// SWIM will continue to gossip about the dead node for a while. - /// We should also notify the high-level membership that the node shall be considered `DOWN`. - case confirmedDead(member: SWIM.Member) +// case markedSuspect(member: SWIM.Member) +// /// Meaning the node is now marked `DEAD`. +// case confirmedDead(member: SWIM.Member) + } + + struct MemberStatusChange { + let member: SWIM.Member + var toStatus: SWIM.Status { + // Note if the member is marked .dead, SWIM shall continue to gossip about it for a while + // such that other nodes gain this information directly, and do not have to wait until they detect + // it as such independently. + self.member.status + } + + /// Previous status of the member, needed in order to decide if the change is "effective" or if applying the + /// member did not move it in such way that we need to inform the cluster about unreachability. + let fromStatus: SWIM.Status? + + init(fromStatus: SWIM.Status?, member: SWIM.Member) { + if let from = fromStatus, from == .dead { + precondition(member.status == .dead, "Change MUST NOT move status 'backwards' from [.dead] state to anything else, but did so, was: \(member)") + } + + self.fromStatus = fromStatus + self.member = member + } + + /// True if the directive was `applied` and the from/to statuses differ, meaning that a change notification has issued. + var isReachabilityChange: Bool { + guard let fromStatus = self.fromStatus else { + // i.e. nil -> anything, is always an effective reachability affecting change + return true + } + + // explicitly list all changes which are affecting reachability, all others do not (i.e. flipping between + // alive and suspect does NOT affect high-level reachability). + switch (fromStatus, self.toStatus) { + case (.alive, .unreachable), + (.alive, .dead): + return true + case (.suspect, .unreachable), + (.suspect, .dead): + return true + case (.unreachable, .alive), + (.unreachable, .suspect): + return true + case (.dead, .alive), + (.dead, .suspect), + (.dead, .unreachable): + fatalError("Change MUST NOT move status 'backwards' from .dead state to anything else, but did so, was: \(self)") + default: + return false + } + } } } extension SWIMInstance.OnGossipPayloadDirective { - static var applied: SWIMInstance.OnGossipPayloadDirective { - return .applied(level: nil, message: nil) + static func applied(change: SWIM.Instance.MemberStatusChange?) -> SWIM.Instance.OnGossipPayloadDirective { + .applied(change: change, level: nil, message: nil) } - static var ignored: SWIMInstance.OnGossipPayloadDirective { - return .ignored(level: nil, message: nil) + static var ignored: SWIM.Instance.OnGossipPayloadDirective { + .ignored(level: nil, message: nil) } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift index 85c99f999..ff78e6bb0 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift @@ -24,6 +24,12 @@ public struct SWIMSettings { // var timeSource: TimeSource // TODO would be nice? + /// Allows for completely disabling the SWIM distributed failure detector. + /// - Warning: disabling this means that no reachability events will be created automatically, + /// which also means that most `DowningStrategy` implementations will not be able to act and `.down` members! + /// Use with great caution, ONLY if you knowingly provide a different method of detecting cluster member node failures. + public var disabled: Bool = false + public var gossip: SWIMGossipSettings = .default public var failureDetector: SWIMFailureDetectorSettings = .default diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift index bbf104b2e..2297e9b0d 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift @@ -63,7 +63,7 @@ internal struct SWIMShell { self.receiveLocalMessage(context: context, message: message) return .same - case .testing(let message): + case ._testing(let message): switch message { case .getMembershipState(let replyTo): context.log.trace("getMembershipState from \(replyTo), state: \(self.swim._allMembersDict)") @@ -333,27 +333,28 @@ internal struct SWIMShell { "swim/timeoutPeriods": "\(timeoutPeriods)", ]) for suspect in self.swim.suspects { - context.log.trace("Checking \(suspect)...") + context.log.trace("Checking suspicion timeout for: \(suspect)...") // proceed with suspicion escalation to .unreachable if the timeout period has been exceeded guard suspect.protocolPeriod <= timeoutPeriods else { continue // skip } -// guard let node = suspect.ref.address.node else { -// continue // is not a remote node -// } - if let incarnation = suspect.status.incarnation { - context.log.trace("Marking \(suspect.node) as .unreachable!") - self.swim.mark(suspect.ref, as: .unreachable(incarnation: incarnation)) + guard let incarnation = suspect.status.incarnation else { + // suspect had no incarnation number? that means it is .dead already and should be recycled soon + return } - // if the member was already unreachable or dead before, we don't need to notify the clusterRef - // (as we already did so once before, and do not want to issue multiple events about this) - if suspect.status.isUnreachable || suspect.status.isDead { - continue + var unreachableSuspect = suspect + unreachableSuspect.status = .unreachable(incarnation: incarnation) + + switch self.swim.mark(unreachableSuspect.ref, as: unreachableSuspect.status) { + case .applied(let previousStatus, _): + let statusChange = SWIM.Instance.MemberStatusChange(fromStatus: previousStatus, member: unreachableSuspect) + self.tryAnnounceMemberReachability(context, change: statusChange) + case .ignoredDueToOlderStatus: + return } - self.escalateMemberUnreachable(context: context, member: suspect) } } @@ -361,60 +362,87 @@ internal struct SWIMShell { func processGossipPayload(context: ActorContext, payload: SWIM.Payload) { switch payload { case .membership(let members): - for member in members { - switch self.swim.onGossipPayload(about: member) { - case .applied where member.status.isUnreachable || member.status.isDead: - // FIXME: rather, we should return in applied if it was a change or not, only if it was we should emit... - - // TODO: ensure we don't double emit this - self.escalateMemberUnreachable(context: context, member: member) - - case .applied: - () - - case .connect(let node, let continueAddingMember): - // ensuring a connection is asynchronous, but executes callback in actor context - self.ensureAssociated(context, remoteNode: node) { uniqueAddressResult in - switch uniqueAddressResult { - case .success(let uniqueAddress): - continueAddingMember(.success(uniqueAddress)) - case .failure(let error): - continueAddingMember(.failure(error)) - context.log.warning("Unable ensure association with \(node), could it have been tombstoned? Error: \(error)") - } - } + self.processGossipedMembership(members: members, context: context) - case .ignored(let level, let message): - if let level = level, let message = message { - context.log.log(level: level, message) - } + case .none: + return // ok + } + } - case .markedSuspect(let member): - context.log.info("Marked member \(member) [.suspect], if it is not found [.alive] again within \(self.settings.failureDetector.suspicionTimeoutPeriodsMax) protocol periods, it will be marked [.unreachable].") + func processGossipedMembership(members: SWIM.Members, context: ActorContext) { + for member in members { + switch self.swim.onGossipPayload(about: member) { + case .connect(let node, let continueAddingMember): + // ensuring a connection is asynchronous, but executes callback in actor context + self.ensureAssociated(context, remoteNode: node) { uniqueAddressResult in + switch uniqueAddressResult { + case .success(let uniqueAddress): + continueAddingMember(.success(uniqueAddress)) + case .failure(let error): + continueAddingMember(.failure(error)) + context.log.warning("Unable ensure association with \(node), could it have been tombstoned? Error: \(error)") + } + } - case .confirmedDead(let member): - context.log.warning("Detected [.dead] node: \(member.node).", metadata: [ - "swim/member": "\(member)", - ]) - context.system.cluster.down(node: member.node.node) // TODO: should w really, or rather mark unreachable and let the downing do this? + case .ignored(let level, let message): + if let level = level, let message = message { + context.log.log(level: level, message) } - } - case .none: - return // ok +// case .markedSuspect(let member): +// context.log.info("Marked member \(member) [.suspect], if it is not found [.alive] again within \(self.settings.failureDetector.suspicionTimeoutPeriodsMax) protocol periods, it will be marked [.unreachable].") +// +// case .confirmedDead(let member): +// context.log.warning("Detected [.dead] node: \(member.node).", metadata: [ +// "swim/member": "\(member)", +// ]) +// context.system.cluster.down(node: member.node.node) // TODO: should w really, or rather mark unreachable and let the downing do this? + + case .applied(let change, _, _): + self.tryAnnounceMemberReachability(context, change: change) + } } } - private func escalateMemberUnreachable(context: ActorContext, member: SWIM.Member) { - context.log.warning( - """ - Node \(member.node) determined [.unreachable]!\ - The node is not yet marked [.down], a downing strategy or other Cluster.Event subscriber may act upon this information. - """, metadata: [ - "swim/member": "\(member)", - ] - ) - self.clusterRef.tell(.command(.failureDetectorReachabilityChanged(member.node, .unreachable))) + /// Announce to the `ClusterShell` a change in reachability of a member. + private func tryAnnounceMemberReachability(_ context: ActorContext, change: SWIM.Instance.MemberStatusChange?) { + guard let change = change else { + // this means it likely was a change to the same status or it was about us, so we do not need to announce anything + return + } + + guard change.isReachabilityChange else { + // the change is from a reachable to another reachable (or an unreachable to another unreachable-like (e.g. dead) state), + // and thus we must not act on it, as the shell was already notified before about the change into the current status. + return + } + + // Log the transition + switch change.toStatus { + case .unreachable: + context.log.info( + """ + Node \(change.member.node) determined [.unreachable]!" \ + The node is not yet marked [.down], a downing strategy or other Cluster.Event subscriber may act upon this information. + """, metadata: [ + "swim/member": "\(change.member)", + ] + ) + default: + context.log.info("Node \(change.member.node) determined [.\(change.toStatus)] (was [\(change.fromStatus, orElse: "nil")].", metadata: [ + "swim/member": "\(change.member)", + ]) + } + + let reachability: Cluster.MemberReachability + switch change.toStatus { + case .alive, .suspect: + reachability = .reachable + case .unreachable, .dead: + reachability = .unreachable + } + + self.clusterRef.tell(.command(.failureDetectorReachabilityChanged(change.member.node, reachability))) } /// Use to ensure an association to given remote node exists; as one may not always be sure a connection has been already established, diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift index b12ff585d..eecb711fe 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift @@ -36,6 +36,7 @@ extension SWIMInstanceTests { ("test_onPing_shouldOfferAckMessageWithMyselfReference", test_onPing_shouldOfferAckMessageWithMyselfReference), ("test_onPing_withAlive_shouldReplyWithAlive_withIncrementedIncarnation", test_onPing_withAlive_shouldReplyWithAlive_withIncrementedIncarnation), ("test_onPing_withSuspicion_shouldReplyWithAlive_withIncrementedIncarnation", test_onPing_withSuspicion_shouldReplyWithAlive_withIncrementedIncarnation), + ("test_MarkedDirective_isEffectiveChange", test_MarkedDirective_isEffectiveChange), ("test_onGossipPayload_myself_withAlive", test_onGossipPayload_myself_withAlive), ("test_onGossipPayload_myself_withSuspectAndSameIncarnation_shouldIncrementIncarnation", test_onGossipPayload_myself_withSuspectAndSameIncarnation_shouldIncrementIncarnation), ("test_onGossipPayload_myself_withSuspectAndLowerIncarnation_shouldNotIncrementIncarnation", test_onGossipPayload_myself_withSuspectAndLowerIncarnation_shouldNotIncrementIncarnation), diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift index 4204d8e77..103811922 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift @@ -250,6 +250,57 @@ final class SWIMInstanceTests: ActorSystemTestBase { } } + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Detecting when a change is "effective" + + func test_MarkedDirective_isEffectiveChange() { + let p = self.testKit.spawnTestProbe(expecting: SWIM.Message.self) + + SWIM.Instance.MemberStatusChange(fromStatus: nil, member: SWIM.Member(ref: p.ref, status: .alive(incarnation: 1), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: nil, member: SWIM.Member(ref: p.ref, status: .suspect(incarnation: 1), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: nil, member: SWIM.Member(ref: p.ref, status: .unreachable(incarnation: 1), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: nil, member: SWIM.Member(ref: p.ref, status: .dead, protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + + SWIM.Instance.MemberStatusChange(fromStatus: .alive(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .alive(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .alive(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .suspect(incarnation: 1), protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .alive(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .unreachable(incarnation: 1), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .alive(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .dead, protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + + SWIM.Instance.MemberStatusChange(fromStatus: .suspect(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .alive(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .suspect(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .suspect(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .suspect(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .unreachable(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .suspect(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .dead, protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + + SWIM.Instance.MemberStatusChange(fromStatus: .unreachable(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .alive(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .unreachable(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .suspect(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeTrue(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .unreachable(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .unreachable(incarnation: 2), protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + SWIM.Instance.MemberStatusChange(fromStatus: .unreachable(incarnation: 1), member: SWIM.Member(ref: p.ref, status: .dead, protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + + // those are illegal, but even IF they happened at least we'd never bubble them up to high level + // moving from .dead to any other state is illegal and will assert + // illegal, precondition crash: SWIM.Instance.MemberStatusChange(fromStatus: .dead, member: SWIM.Member(ref: p.ref, status: .alive(incarnation: 2), protocolPeriod: 1)) + // illegal, precondition crash: SWIM.Instance.MemberStatusChange(fromStatus: .dead, member: SWIM.Member(ref: p.ref, status: .suspect(incarnation: 2), protocolPeriod: 1)) + // illegal, precondition crash: SWIM.Instance.MemberStatusChange(fromStatus: .dead, member: SWIM.Member(ref: p.ref, status: .unreachable(incarnation: 2), protocolPeriod: 1)) + SWIM.Instance.MemberStatusChange(fromStatus: .dead, member: SWIM.Member(ref: p.ref, status: .dead, protocolPeriod: 1)) + .isReachabilityChange.shouldBeFalse(line: #line - 1) + } + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: handling gossip about the receiving node @@ -266,7 +317,7 @@ final class SWIMInstanceTests: ActorSystemTestBase { swim.incarnation.shouldEqual(currentIncarnation) switch res { - case .applied(_, let warning) where warning == nil: + case .applied(_, _, let warning) where warning == nil: () default: throw self.testKit.fail("Expected `.applied(warning: nil)`, got \(res)") @@ -287,7 +338,7 @@ final class SWIMInstanceTests: ActorSystemTestBase { swim.incarnation.shouldEqual(currentIncarnation + 1) switch res { - case .applied(_, let warning) where warning == nil: + case .applied(_, _, let warning) where warning == nil: () default: throw self.testKit.fail("Expected `.applied(warning: nil)`, got \(res)") @@ -300,6 +351,7 @@ final class SWIMInstanceTests: ActorSystemTestBase { let myself = try system.spawn("SWIM", SWIM.Shell(swim, clusterRef: self.clusterTestProbe.ref).ready) swim.addMyself(myself) + var myselfMember = swim.member(for: myself)! // necessary to increment incarnation @@ -314,10 +366,10 @@ final class SWIMInstanceTests: ActorSystemTestBase { swim.incarnation.shouldEqual(currentIncarnation) switch res { - case .applied(nil, nil): + case .ignored(nil, nil): () default: - throw self.testKit.fail("Expected [applied(level: nil, message: nil)], got [\(res)]") + throw self.testKit.fail("Expected [ignored(level: nil, message: nil)], got [\(res)]") } } @@ -335,7 +387,7 @@ final class SWIMInstanceTests: ActorSystemTestBase { swim.incarnation.shouldEqual(currentIncarnation) switch res { - case .applied(_, let warning) where warning != nil: + case .applied(nil, _, let warning) where warning != nil: () default: throw self.testKit.fail("Expected `.none(message)`, got \(res)") @@ -352,11 +404,14 @@ final class SWIMInstanceTests: ActorSystemTestBase { myselfMember.status = .dead let res = swim.onGossipPayload(about: myselfMember) + let myMember = swim.member(for: myself)! + myMember.status.shouldEqual(.dead) + switch res { - case .confirmedDead(let member): - member.shouldEqual(myselfMember) + case .applied(.some(let change), _, _) where change.toStatus.isDead: + change.member.shouldEqual(myselfMember) default: - throw self.testKit.fail("Expected `.confirmedDead`, got \(res)") + throw self.testKit.fail("Expected `.applied(.some(change to dead)`, got: \(res)") } } @@ -364,9 +419,9 @@ final class SWIMInstanceTests: ActorSystemTestBase { let swim = SWIM.Instance(.default) let myself = try system.spawn("SWIM", SWIM.Shell(swim, clusterRef: self.clusterTestProbe.ref).ready) - swim.addMyself(myself) - let other = try system.spawn("SWIM-B", SWIM.Shell(swim, clusterRef: self.clusterTestProbe.ref).ready) + + swim.addMyself(myself) swim.addMember(other, status: .alive(incarnation: 0)) var otherMember = swim.member(for: other)! @@ -374,10 +429,10 @@ final class SWIMInstanceTests: ActorSystemTestBase { let res = swim.onGossipPayload(about: otherMember) switch res { - case .confirmedDead(let member): - member.shouldEqual(otherMember) + case .applied(.some(let change), _, _) where change.toStatus.isDead: + change.member.shouldEqual(otherMember) default: - throw self.testKit.fail("Expected `.confirmedDead`, got \(res)") + throw self.testKit.fail("Expected `.applied(.some(change to dead))`, got \(res)") } } diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift index 7ecab3ba2..dd02c53a9 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift @@ -496,10 +496,10 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { secondSwim.tell(.remote(.pingReq(target: localRefRemote, lastKnownStatus: .alive(incarnation: 0), replyTo: pingProbe.ref, payload: .none))) try self.testKit(first).eventually(within: .seconds(10)) { - firstSwim.tell(.testing(.getMembershipState(replyTo: membershipProbe.ref))) + firstSwim.tell(._testing(.getMembershipState(replyTo: membershipProbe.ref))) let statusA = try membershipProbe.expectMessage(within: .seconds(1)) - secondSwim.tell(.testing(.getMembershipState(replyTo: membershipProbe.ref))) + secondSwim.tell(._testing(.getMembershipState(replyTo: membershipProbe.ref))) let statusB = try membershipProbe.expectMessage(within: .seconds(1)) guard statusA.membershipState.count == 2, statusB.membershipState.count == 2 else { @@ -595,7 +595,7 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { let stateProbe = testKit.spawnTestProbe(expecting: SWIM.MembershipState.self) try testKit.eventually(within: timeout, file: file, line: line, column: column) { - swimShell.tell(.testing(.getMembershipState(replyTo: stateProbe.ref))) + swimShell.tell(._testing(.getMembershipState(replyTo: stateProbe.ref))) let membership = try stateProbe.expectMessage() let otherStatus = membership.membershipState[member] @@ -614,7 +614,7 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { let stateProbe = testKit.spawnTestProbe(expecting: SWIM.MembershipState.self) try testKit.assertHolds(for: timeout, file: file, line: line, column: column) { - swimShell.tell(.testing(.getMembershipState(replyTo: stateProbe.ref))) + swimShell.tell(._testing(.getMembershipState(replyTo: stateProbe.ref))) let otherStatus = try stateProbe.expectMessage().membershipState[member] guard otherStatus == status else { throw testKit.error("Expected status [\(status)] for [\(member)], but found \(otherStatus.debugDescription)") From ff4292c5c6f20d2694cbd6ac61b14dfe7e737395 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Mon, 27 Jan 2020 17:53:19 +0900 Subject: [PATCH 2/3] =build #23 build with warnings-as-errors, fix the package conf --- Package.swift | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/Package.swift b/Package.swift index f352b402d..ed4e2bd00 100644 --- a/Package.swift +++ b/Package.swift @@ -8,9 +8,15 @@ import class Foundation.ProcessInfo // and ONE of our dependencies currently produces one warning, we have to use this workaround to enable it in _our_ // targets when the flag is set. We should remove the dependencies and then enable the flag globally though just by passing it. // TODO: Follow up to https://github.com/apple/swift-distributed-actors/issues/23 by removing Files and Stencil, then we can remove this workaround -let globalSwiftSettings: [SwiftSetting] = ProcessInfo.processInfo.environment["SACT_WARNINGS_AS_ERRORS"] == nil ? [ - SwiftSetting.unsafeFlags(["-warnings-as-errors"]) -] : [] +let globalSwiftSettings: [SwiftSetting] +if ProcessInfo.processInfo.environment["SACT_WARNINGS_AS_ERRORS"] != nil { + print("SACT_WARNINGS_AS_ERRORS enabled, passing `-warnings-as-errors`") + globalSwiftSettings = [ + SwiftSetting.unsafeFlags(["-warnings-as-errors"]) + ] +} else { + globalSwiftSettings = [] +} var targets: [PackageDescription.Target] = [ // ==== ------------------------------------------------------------------------------------------------------------ @@ -307,7 +313,11 @@ var package = Package( dependencies: dependencies, targets: targets.map { target in - target.swiftSettings?.append(contentsOf: globalSwiftSettings) + var swiftSettings = target.swiftSettings ?? [] + swiftSettings.append(contentsOf: globalSwiftSettings) + if !swiftSettings.isEmpty { + target.swiftSettings = swiftSettings + } return target }, From e455202716242cda0ea568d2876aad88780eb574 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 28 Jan 2020 01:09:09 +0900 Subject: [PATCH 3/3] +swim #401 Member may become alive from unreachable, and must emit such event then --- .gitignore | 1 + IntegrationTests/run-tests.sh | 4 +- .../main.swift | 57 ++ IntegrationTests/tests_04_cluster/shared.sh | 55 ++ ..._unreachable_unsuspend_causes_reachable.sh | 64 ++ Package.swift | 7 + Samples/Sources/SampleCluster/main.swift | 8 +- .../Cluster/ClusterReceptionist.swift | 4 +- .../Cluster/ClusterShell.swift | 20 +- .../DistributedActors/Cluster/SWIM/SWIM.swift | 7 +- .../Cluster/SWIM/SWIMInstance.swift | 11 +- .../Cluster/SWIM/SWIMShell.swift | 63 +- .../ActorRef+RemotePersonality.swift | 4 +- .../DistributedActorsTestKit/LogCapture.swift | 4 +- .../ActorLeakingTests.swift | 2 +- .../ClusterMembershipGossipTests.swift | 2 +- .../SWIM/SWIMInstanceTests+XCTest.swift | 3 +- .../Cluster/SWIM/SWIMInstanceTests.swift | 26 +- .../SWIM/SWIMShellClusteredTests+XCTest.swift | 1 - .../SWIM/SWIMShellClusteredTests.swift | 62 +- wat.json | 668 ------------------ 21 files changed, 327 insertions(+), 746 deletions(-) create mode 100644 IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability/main.swift create mode 100644 IntegrationTests/tests_04_cluster/shared.sh create mode 100755 IntegrationTests/tests_04_cluster/test_01_suspend_causes_unreachable_unsuspend_causes_reachable.sh delete mode 100644 wat.json diff --git a/.gitignore b/.gitignore index a66703d32..7b4a029e4 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ SwiftDistributedActors.xcworkspace .xcode .idea +IntegrationTests/tests_04_cluster/.build # rendered docs output dirs /reference/ diff --git a/IntegrationTests/run-tests.sh b/IntegrationTests/run-tests.sh index fb871aa4f..a9e59aa7d 100755 --- a/IntegrationTests/run-tests.sh +++ b/IntegrationTests/run-tests.sh @@ -88,7 +88,9 @@ while getopts "f:vid" opt; do done function run_test() { - if $verbose; then + if $no_io_redirect; then + "$@" + elif $verbose; then "$@" 2>&1 | tee -a "$out" # we need to return the return value of the first command return ${PIPESTATUS[0]} diff --git a/IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability/main.swift b/IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability/main.swift new file mode 100644 index 000000000..bbd3bc84c --- /dev/null +++ b/IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability/main.swift @@ -0,0 +1,57 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedActors + +print("Getting args") + +var args = CommandLine.arguments +args.removeFirst() + +print("got args: \(args)") + +guard args.count >= 1 else { + fatalError("no port given") +} + +let system = ActorSystem("System") { settings in + settings.defaultLogLevel = .info + + settings.cluster.enabled = true + settings.cluster.bindPort = Int(args[0])! + + settings.cluster.swim.failureDetector.suspicionTimeoutPeriodsMax = 3 + settings.cluster.swim.failureDetector.pingTimeout = .milliseconds(100) + settings.cluster.swim.failureDetector.probeInterval = .milliseconds(300) + + settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2) + settings.cluster.downingStrategy = .none +} + +let ref = try system.spawn("streamWatcher", of: Cluster.Event.self, .receive { context, event in + context.log.info("Event: \(event)") + return .same +}) +system.cluster.events.subscribe(ref) + +if args.count >= 3 { + print("getting host") + let host = args[1] + print("parsing port") + let port = Int(args[2])! + print("Joining") + system.cluster.join(node: Node(systemName: "System", host: host, port: port)) +} + +Thread.sleep(.seconds(120)) diff --git a/IntegrationTests/tests_04_cluster/shared.sh b/IntegrationTests/tests_04_cluster/shared.sh new file mode 100644 index 000000000..077c39a1f --- /dev/null +++ b/IntegrationTests/tests_04_cluster/shared.sh @@ -0,0 +1,55 @@ +#!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the Swift Distributed Actors open source project +## +## Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## + +RED='\033[0;31m' +RST='\033[0m' + +function echoerr() { + echo "${RED}$@${RST}" 1>&2; +} + +function _killall() { + set +e + local killall_app_name="$1" + echo "> KILLALL: $killall_app_name" + ps aux | grep ${killall_app_name} | awk '{ print $2 }' | xargs kill -9 + set -e +} + +function wait_log_exists() { + _log_file="$1" + _expected_line="$2" + if [[ "$#" -eq 3 ]]; then + _max_spins="$3" + max_spins=$(expr ${_max_spins} + 0) + else + max_spins=20 + fi + spin=1 # spin counter + while [[ $(cat ${_log_file} | grep "${_expected_line}" | wc -l) -ne 1 ]]; do + echo "---------------------------------------------------------------------------------------------------------" + cat ${_log_file} + echo "=========================================================================================================" + + sleep 1 + spin=$((spin+1)) + if [[ ${spin} -eq ${max_spins} ]]; then + echoerr "Never saw enough '${_expected_line}' in logs." + cat ${_log_file} + exit -1 + fi + done + +} diff --git a/IntegrationTests/tests_04_cluster/test_01_suspend_causes_unreachable_unsuspend_causes_reachable.sh b/IntegrationTests/tests_04_cluster/test_01_suspend_causes_unreachable_unsuspend_causes_reachable.sh new file mode 100755 index 000000000..7815102c4 --- /dev/null +++ b/IntegrationTests/tests_04_cluster/test_01_suspend_causes_unreachable_unsuspend_causes_reachable.sh @@ -0,0 +1,64 @@ +#!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the Swift Distributed Actors open source project +## +## Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## + +set -e +#set -x # verbose + +declare -r my_path="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +declare -r root_path="$my_path/.." + +declare -r app_name='it_Clustered_swim_suspension_reachability' + +source ${my_path}/shared.sh + +declare -r first_logs=/tmp/sact_first.out +declare -r second_logs=/tmp/sact_second.out +rm -f ${first_logs} +rm -f ${second_logs} + +stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 7337 > ${first_logs} 2>&1 & +declare -r first_pid=$(echo $!) +wait_log_exists ${first_logs} 'Binding to: ' 200 # since it might be compiling again... + +stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 8228 localhost 7337 > ${second_logs} 2>&1 & +declare -r second_pid=$(echo $!) +wait_log_exists ${second_logs} 'Binding to: ' 200 # since it might be compiling again... + +echo "Waiting nodes to become .up..." +wait_log_exists ${first_logs} 'membershipChange(sact://System@localhost:8228 :: \[joining\] -> \[ up\])' 40 +echo 'Second member seen .up, good...' + +# suspend the second process, causing unreachability +kill -SIGSTOP ${second_pid} +jobs + +wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: unreachable' 40 +echo 'Second member seen .unreachable, good...' + +# resume it in the background +kill -SIGCONT ${second_pid} + +# it should become reachable again +declare -r expected_second_member_unreachable= +wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: reachable' 40 +echo 'Second member seen .unreachable, good...' + + +# === cleanup ---------------------------------------------------------------------------------------------------------- + +kill -9 ${first_pid} +kill -9 ${second_pid} + +_killall ${app_name} diff --git a/Package.swift b/Package.swift index ed4e2bd00..9dc2d9d0e 100644 --- a/Package.swift +++ b/Package.swift @@ -168,6 +168,13 @@ var targets: [PackageDescription.Target] = [ ], path: "IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_backoffRespawn" ), + .target( + name: "it_Clustered_swim_suspension_reachability", + dependencies: [ + "DistributedActors", + ], + path: "IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability" + ), // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Performance / Benchmarks diff --git a/Samples/Sources/SampleCluster/main.swift b/Samples/Sources/SampleCluster/main.swift index 8e2493b6d..0d9911130 100644 --- a/Samples/Sources/SampleCluster/main.swift +++ b/Samples/Sources/SampleCluster/main.swift @@ -31,10 +31,16 @@ guard args.count >= 1 else { let system = ActorSystem("System") { settings in settings.cluster.enabled = true settings.cluster.bindPort = Int(args[0])! - settings.cluster.downingStrategy = .timeout(.default) + settings.cluster.downingStrategy = .none settings.defaultLogLevel = .debug } +let ref = try system.spawn("hello", of: Cluster.Event.self, .receive { context, event in + context.log.info("event = \(event)") + return .same +}) +system.cluster.events.subscribe(ref) + if args.count >= 3 { print("getting host") let host = args[1] diff --git a/Sources/DistributedActors/Cluster/ClusterReceptionist.swift b/Sources/DistributedActors/Cluster/ClusterReceptionist.swift index 1b57f09af..60448db9c 100644 --- a/Sources/DistributedActors/Cluster/ClusterReceptionist.swift +++ b/Sources/DistributedActors/Cluster/ClusterReceptionist.swift @@ -113,7 +113,7 @@ internal enum ClusterReceptionist { } private static func onFullStateRequest(context: ActorContext, request: ClusterReceptionist.FullStateRequest, storage: Receptionist.Storage) { - context.log.debug("Received full state request from [\(request.replyTo)]") // TODO: tracelog style + context.log.trace("Received full state request from [\(request.replyTo)]") // TODO: tracelog style var registrations: [AnyRegistrationKey: [ActorAddress]] = [:] registrations.reserveCapacity(storage._registrations.count) for (key, values) in storage._registrations { @@ -141,7 +141,7 @@ internal enum ClusterReceptionist { } private static func onFullState(context: ActorContext, fullState: ClusterReceptionist.FullState, storage: Receptionist.Storage) throws { - context.log.debug("Received full state \(fullState)") // TODO: tracelog style + context.log.trace("Received full state \(fullState)") // TODO: tracelog style for (key, paths) in fullState.registrations { var anyAdded = false for path in paths { diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index b9e5dc03b..615f5e76a 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -46,12 +46,6 @@ internal class ClusterShell { private var _associationTombstones: Set private var _swimRef: SWIM.Ref? - private var swimRef: SWIM.Ref { - guard let ref = _swimRef else { - return fatalErrorBacktrace("Illegal early access to ClusterShell._swimRef detected! This ref is initialized during bind(), and must not be accessed earlier than that.") - } - return ref - } private var clusterEvents: EventStream! @@ -130,7 +124,7 @@ internal class ClusterShell { // notify the failure detector, that we shall assume this node as dead from here on. // it's gossip will also propagate the information through the cluster traceLog_Remote(system.cluster.node, "Finish terminate association [\(remoteNode)]: Notifying SWIM, .confirmDead") - self.swimRef.tell(.local(.confirmDead(remoteNode))) + self._swimRef?.tell(.local(.confirmDead(remoteNode))) // Ensure to remove (completely) the member from the Membership, it is not even .leaving anymore. if state.membership.mark(remoteNode, as: .down) == nil { @@ -341,10 +335,10 @@ extension ClusterShell { self._swimRef = try context._downcastUnsafe._spawn(SWIMShell.naming, props: ._wellKnown, swimBehavior) } else { context.log.warning(""" - SWIM Failure Detector has been [disabled]! \ - Reachability events will NOT be emitted, meaning that most downing strategies will not be able to perform \ - their duties. Please ensure that an external mechanism for detecting failed cluster nodes is used. - """) + SWIM Failure Detector has been [disabled]! \ + Reachability events will NOT be emitted, meaning that most downing strategies will not be able to perform \ + their duties. Please ensure that an external mechanism for detecting failed cluster nodes is used. + """) self._swimRef = nil } @@ -939,7 +933,7 @@ extension ClusterShell { switch res { case .success(.success(let uniqueNode)): context.log.debug("Associated \(uniqueNode), informing SWIM to monitor this node.") - self.swimRef.tell(.local(.monitor(uniqueNode))) + self._swimRef?.tell(.local(.monitor(uniqueNode))) return .same // .same, since state was modified since inside the handshakeWith (!) case .success(.failure(let error)): context.log.debug("Handshake with \(reflecting: node) failed: \(error)") @@ -990,7 +984,7 @@ extension ClusterShell { "cluster/membership": "\(state.membership)", // TODO: introduce state.metadata pattern? ]) - self.swimRef.tell(.local(.confirmDead(memberToDown.node))) + self._swimRef?.tell(.local(.confirmDead(memberToDown.node))) do { let onDownAction = context.system.settings.cluster.onDownAction.make() diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift index e1c5cda00..963c2a1ee 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIM.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIM.swift @@ -66,6 +66,11 @@ public enum SWIM { let pinged: ActorRef let incarnation: Incarnation let payload: Payload + + /// Represents the pinged member in alive status, since it clearly has replied to our ping, so it must be alive. + func pingedAliveMember(protocolPeriod: Int) -> SWIM.Member { + .init(ref: self.pinged, status: .alive(incarnation: self.incarnation), protocolPeriod: protocolPeriod) + } } internal struct MembershipState { @@ -231,7 +236,7 @@ extension SWIM.Status { switch self { case .dead: return true - case .alive, .unreachable, .suspect: + case .alive, .suspect, .unreachable: return false } } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift index 1f92af16e..2f034d37a 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift @@ -56,9 +56,8 @@ final class SWIMInstance { // The protocol period represents the number of times we have pinged a random member // of the cluster. At the end of every ping cycle, the number will be incremented. - // Suspicion timeouts are based on the protocol period, e.g. if the ping interval - // is 300ms and the suspicion timeout is set to 10 periods, a suspected node will - // be declared `.dead` after not receiving an `.alive` for approx. 3 seconds. + // Suspicion timeouts are based on the protocol period, i.e. if a probe did not + // reply within any of the `suspicionTimeoutPeriodsMax` rounds, it would be marked as `.suspect`. private var _protocolPeriod: Int = 0 // We store the owning SWIMShell ref in order avoid adding it to the `membersToPing` list @@ -408,7 +407,7 @@ extension SWIM.Instance { } private func onMyselfGossipPayload(myself incoming: SWIM.Member) -> SWIM.Instance.OnGossipPayloadDirective { - assert(self.myShellMyself == incoming.ref, "Attempted to process gossip as-if about myself, but was not the same ref, was: \(incoming). Myself: \(self.myShellMyself)") + assert(self.myShellMyself == incoming.ref, "Attempted to process gossip as-if about myself, but was not the same ref, was: \(incoming). Myself: \(self.myShellMyself, orElse: "nil")") // Note, we don't yield changes for myself node observations, thus the self node will never be reported as unreachable, // after all, we can always reach ourselves. We may reconsider this if we wanted to allow SWIM to inform us about @@ -474,7 +473,7 @@ extension SWIM.Instance { } private func onOtherMemberGossipPayload(member: SWIM.Member) -> SWIM.Instance.OnGossipPayloadDirective { - assert(self.myShellMyself != member.ref, "Attempted to process gossip as-if not-myself, but WAS same ref, was: \(member). Myself: \(self.myShellMyself)") + assert(self.myShellMyself != member.ref, "Attempted to process gossip as-if not-myself, but WAS same ref, was: \(member). Myself: \(self.myShellMyself, orElse: "nil")") if self.isMember(member.ref) { switch self.mark(member.ref, as: member.status) { @@ -489,7 +488,7 @@ extension SWIM.Instance { } else if let remoteMemberNode = member.ref.address.node { return .connect(node: remoteMemberNode, onceConnected: { switch $0 { - case .success(let uniqueNode): + case .success: self.addMember(member.ref, status: member.status) case .failure: self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!) diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift index 2297e9b0d..dd7bcb61e 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift @@ -170,11 +170,16 @@ internal struct SWIMShell { guard !membersToPingRequest.isEmpty else { // no nodes available to ping, so we have to assume the node suspect right away if let lastIncarnation = lastKnownStatus.incarnation { - context.log.info("No members to ping-req through, marking [\(toPing)] immediately as [.suspect]. Members: [\(self.swim._allMembersDict)]") - self.swim.mark(toPing, as: .suspect(incarnation: lastIncarnation)) - return + switch self.swim.mark(toPing, as: .suspect(incarnation: lastIncarnation)) { + case .applied(_, let currentStatus): + context.log.info("No members to ping-req through, marked [\(toPing)] immediately as [\(currentStatus)].") + return + case .ignoredDueToOlderStatus(let currentStatus): + context.log.info("No members to ping-req through to [\(toPing)], was already [\(currentStatus)].") + return + } } else { - context.log.debug("Not marking .suspect, as [\(toPing)] is already dead.") // "You are already dead!" + context.log.trace("Not marking .suspect, as [\(toPing)] is already dead.") // "You are already dead!" return } } @@ -230,10 +235,9 @@ internal struct SWIMShell { self.sendPingRequests(context: context, toPing: pingedMember) } case .success(let ack): - context.log.trace("Received ack from [\(ack.pinged)] with incarnation [\(ack.incarnation)] and payload [\(ack.payload)]") - self.swim.mark(ack.pinged, as: .alive(incarnation: ack.incarnation)) + context.log.debug("Received ack from [\(ack.pinged)] with incarnation [\(ack.incarnation)] and payload [\(ack.payload)]", metadata: self.swim.metadata) + self.markMember(context, latest: ack.pingedAliveMember(protocolPeriod: self.swim.protocolPeriod)) pingReqOrigin?.tell(ack) - self.processGossipPayload(context: context, payload: ack.payload) } } @@ -255,7 +259,7 @@ internal struct SWIMShell { // MARK: Handling local messages func handlePingRandomMember(_ context: ActorContext) { - context.log.trace("Received periodic trigger to ping random member, among: \(self.swim._allMembersDict.count)", metadata: self.swim.metadata) + context.log.trace("Periodic ping random member, among: \(self.swim._allMembersDict.count)", metadata: self.swim.metadata) // needs to be done first, so we can gossip out the most up to date state self.checkSuspicionTimeouts(context: context) @@ -294,7 +298,7 @@ internal struct SWIMShell { // TODO: GC tombstones after a day switch self.swim.mark(member.ref, as: .dead) { - case .applied(let .some(previousState), let currentState): + case .applied(let .some(previousState), _): if previousState.isSuspect || previousState.isUnreachable { context.log.warning("Marked [\(member)] as [.dead]. Was marked \(previousState) in protocol period [\(member.protocolPeriod)]", metadata: [ "swim/protocolPeriod": "\(self.swim.protocolPeriod)", @@ -325,19 +329,20 @@ internal struct SWIMShell { func checkSuspicionTimeouts(context: ActorContext) { // TODO: push more of logic into SWIM instance, the calculating // FIXME: use decaying timeout as proposed in lifeguard paper - let timeoutPeriods = (self.swim.protocolPeriod - self.swim.settings.failureDetector.suspicionTimeoutPeriodsMax) + let timeoutSuspectsBeforePeriod = (self.swim.protocolPeriod - self.swim.settings.failureDetector.suspicionTimeoutPeriodsMax) context.log.trace("Checking suspicion timeouts...", metadata: [ "swim/suspects": "\(self.swim.suspects)", "swim/all": "\(self.swim._allMembersDict)", "swim/protocolPeriod": "\(self.swim.protocolPeriod)", - "swim/timeoutPeriods": "\(timeoutPeriods)", + "swim/timeoutSuspectsBeforePeriod": "\(timeoutSuspectsBeforePeriod)", ]) + for suspect in self.swim.suspects { context.log.trace("Checking suspicion timeout for: \(suspect)...") // proceed with suspicion escalation to .unreachable if the timeout period has been exceeded - guard suspect.protocolPeriod <= timeoutPeriods else { - continue // skip + guard suspect.protocolPeriod <= timeoutSuspectsBeforePeriod else { + continue // skip, this suspect is not timed-out yet } guard let incarnation = suspect.status.incarnation else { @@ -347,14 +352,21 @@ internal struct SWIMShell { var unreachableSuspect = suspect unreachableSuspect.status = .unreachable(incarnation: incarnation) + _ = self.markMember(context, latest: unreachableSuspect) + } + } - switch self.swim.mark(unreachableSuspect.ref, as: unreachableSuspect.status) { - case .applied(let previousStatus, _): - let statusChange = SWIM.Instance.MemberStatusChange(fromStatus: previousStatus, member: unreachableSuspect) - self.tryAnnounceMemberReachability(context, change: statusChange) - case .ignoredDueToOlderStatus: - return - } + private func markMember(_ context: ActorContext, latest: SWIM.Member) { + switch self.swim.mark(latest.ref, as: latest.status) { + case .applied(let previousStatus, _): + context.log.trace("Marked \(latest.node) as \(latest.status), announcing reachability change", metadata: [ + "swim/member": "\(latest)", + "swim/previousStatus": "\(previousStatus, orElse: "nil")", + ]) + let statusChange = SWIM.Instance.MemberStatusChange(fromStatus: previousStatus, member: latest) + self.tryAnnounceMemberReachability(context, change: statusChange) + case .ignoredDueToOlderStatus: + () // context.log.trace("No change \(latest), currentStatus remains [\(currentStatus)]. No reachability change to announce") } } @@ -389,15 +401,6 @@ internal struct SWIMShell { context.log.log(level: level, message) } -// case .markedSuspect(let member): -// context.log.info("Marked member \(member) [.suspect], if it is not found [.alive] again within \(self.settings.failureDetector.suspicionTimeoutPeriodsMax) protocol periods, it will be marked [.unreachable].") -// -// case .confirmedDead(let member): -// context.log.warning("Detected [.dead] node: \(member.node).", metadata: [ -// "swim/member": "\(member)", -// ]) -// context.system.cluster.down(node: member.node.node) // TODO: should w really, or rather mark unreachable and let the downing do this? - case .applied(let change, _, _): self.tryAnnounceMemberReachability(context, change: change) } @@ -422,7 +425,7 @@ internal struct SWIMShell { case .unreachable: context.log.info( """ - Node \(change.member.node) determined [.unreachable]!" \ + Node \(change.member.node) determined [.unreachable]! \ The node is not yet marked [.down], a downing strategy or other Cluster.Event subscriber may act upon this information. """, metadata: [ "swim/member": "\(change.member)", diff --git a/Sources/DistributedActors/Cluster/Transport/ActorRef+RemotePersonality.swift b/Sources/DistributedActors/Cluster/Transport/ActorRef+RemotePersonality.swift index 15b401c6b..c61573802 100644 --- a/Sources/DistributedActors/Cluster/Transport/ActorRef+RemotePersonality.swift +++ b/Sources/DistributedActors/Cluster/Transport/ActorRef+RemotePersonality.swift @@ -99,7 +99,9 @@ public final class RemotePersonality { return self.remoteControl } // else, fall through to the return nil below case .associated(let remoteControl): - self.system.log.warning("FIXME: Workaround, ActorRef's RemotePersonality had to spin \(spinNr) times to obtain remoteControl to send message to \(self.address)") + if spinNr > 1 { + self.system.log.warning("FIXME: Workaround, ActorRef's RemotePersonality had to spin \(spinNr) times to obtain remoteControl to send message to \(self.address)") + } // self._cachedAssociationRemoteControl = remoteControl // TODO: atomically cache a remote control? return remoteControl case .tombstone: diff --git a/Sources/DistributedActorsTestKit/LogCapture.swift b/Sources/DistributedActorsTestKit/LogCapture.swift index 49e3114b5..1928da67f 100644 --- a/Sources/DistributedActorsTestKit/LogCapture.swift +++ b/Sources/DistributedActorsTestKit/LogCapture.swift @@ -78,7 +78,7 @@ extension LogCapture { public var minimumLogLevel: Logger.Level = .trace /// Filter and capture logs only from actors with the following path prefix - public var filterActorPath: String = "/" + public var filterActorPaths: Set = ["/"] /// Do not capture log messages which include the following strings. public var excludeActorPaths: Set = [] @@ -169,7 +169,7 @@ struct LogCaptureLogHandler: LogHandler { } public func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) { - guard self.label.starts(with: self.capture.settings.filterActorPath) else { + guard self.capture.settings.filterActorPaths.contains(where: { path in self.label.starts(with: path) }) else { return // ignore this actor's logs, it was filtered out } guard !self.capture.settings.excludeActorPaths.contains(self.label) else { diff --git a/Tests/DistributedActorsTests/ActorLeakingTests.swift b/Tests/DistributedActorsTests/ActorLeakingTests.swift index 953af1539..3e302bc18 100644 --- a/Tests/DistributedActorsTests/ActorLeakingTests.swift +++ b/Tests/DistributedActorsTests/ActorLeakingTests.swift @@ -159,7 +159,7 @@ class ActorLeakingTests: ActorSystemTestBase { } else { for _ in 1 ... childCount { let b: Behavior = .receiveMessage { _ in .same } - try context.spawn(.anonymous, b) + _ = try context.spawn(.anonymous, b) } return .same } diff --git a/Tests/DistributedActorsTests/Cluster/ClusterMembershipGossipTests.swift b/Tests/DistributedActorsTests/Cluster/ClusterMembershipGossipTests.swift index 5a717e208..acf06e4a7 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusterMembershipGossipTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusterMembershipGossipTests.swift @@ -20,7 +20,7 @@ import XCTest final class ClusterMembershipGossipTests: ClusteredNodesTestBase { override func configureLogCapture(settings: inout LogCapture.Settings) { - settings.filterActorPath = "/system/cluster" + settings.filterActorPaths = ["/system/cluster"] settings.excludeActorPaths = ["/system/cluster/swim"] // we assume it works fine settings.excludeGrep = ["with generation"] // exclude timers noise } diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift index eecb711fe..ddf94455c 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests+XCTest.swift @@ -28,7 +28,8 @@ extension SWIMInstanceTests { ("test_notMyself_shouldDetectRandomNotMyselfActor", test_notMyself_shouldDetectRandomNotMyselfActor), ("test_mark_shouldNotApplyEqualStatus", test_mark_shouldNotApplyEqualStatus), ("test_mark_shouldApplyNewerStatus", test_mark_shouldApplyNewerStatus), - ("test_mark_shouldNotApplyOlderStatus", test_mark_shouldNotApplyOlderStatus), + ("test_mark_shouldNotApplyOlderStatus_suspect", test_mark_shouldNotApplyOlderStatus_suspect), + ("test_mark_shouldNotApplyOlderStatus_unreachable", test_mark_shouldNotApplyOlderStatus_unreachable), ("test_mark_shouldApplyDead", test_mark_shouldApplyDead), ("test_mark_shouldNotApplyAnyStatusIfAlreadyDead", test_mark_shouldNotApplyAnyStatusIfAlreadyDead), ("test_onPingRequestResponse_allowsSuspectNodeToRefuteSuspicion", test_onPingRequestResponse_allowsSuspectNodeToRefuteSuspicion), diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift index 103811922..10c4b4639 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMInstanceTests.swift @@ -99,17 +99,31 @@ final class SWIMInstanceTests: ActorSystemTestBase { swim.member(for: probe.ref)!.protocolPeriod.shouldEqual(6) } - func test_mark_shouldNotApplyOlderStatus() throws { - let probe = self.testKit.spawnTestProbe(expecting: SWIM.Message.self) + func test_mark_shouldNotApplyOlderStatus_suspect() throws { let swim = SWIM.Instance(.default) - swim.addMember(probe.ref, status: .suspect(incarnation: 1)) + // ==== Suspect member ----------------------------------------------------------------------------------------- + let suspectMember = self.testKit.spawnTestProbe(expecting: SWIM.Message.self) + swim.addMember(suspectMember.ref, status: .suspect(incarnation: 1)) swim.incrementProtocolPeriod() - try self.validateMark(swim: swim, member: probe.ref, status: .suspect(incarnation: 0), shouldSucceed: false) - try self.validateMark(swim: swim, member: probe.ref, status: .alive(incarnation: 1), shouldSucceed: false) + try self.validateMark(swim: swim, member: suspectMember.ref, status: .suspect(incarnation: 0), shouldSucceed: false) + try self.validateMark(swim: swim, member: suspectMember.ref, status: .alive(incarnation: 1), shouldSucceed: false) - swim.member(for: probe.ref)!.protocolPeriod.shouldEqual(0) + swim.member(for: suspectMember.ref)!.protocolPeriod.shouldEqual(0) + } + + func test_mark_shouldNotApplyOlderStatus_unreachable() throws { + let swim = SWIM.Instance(.default) + + let unreachableMember = self.testKit.spawnTestProbe(expecting: SWIM.Message.self) + swim.addMember(unreachableMember.ref, status: .unreachable(incarnation: 1)) + swim.incrementProtocolPeriod() + + try self.validateMark(swim: swim, member: unreachableMember.ref, status: .suspect(incarnation: 0), shouldSucceed: false) + try self.validateMark(swim: swim, member: unreachableMember.ref, status: .alive(incarnation: 1), shouldSucceed: false) + + swim.member(for: unreachableMember.ref)!.protocolPeriod.shouldEqual(0) } func test_mark_shouldApplyDead() throws { diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift index 1bcfea14c..99e06c750 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests+XCTest.swift @@ -32,7 +32,6 @@ extension SWIMShellClusteredTests { ("test_swim_shouldMarkSuspectedMembersAsAlive_whenPingingSucceedsWithinSuspicionTimeout", test_swim_shouldMarkSuspectedMembersAsAlive_whenPingingSucceedsWithinSuspicionTimeout), ("test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed", test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed), ("test_swim_shouldNotifyClusterAboutUnreachableNode_whenUnreachableDiscoveredByOtherNode", test_swim_shouldNotifyClusterAboutUnreachableNode_whenUnreachableDiscoveredByOtherNode), - ("test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain", test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain), ("test_swim_shouldSendGossipInAck", test_swim_shouldSendGossipInAck), ("test_swim_shouldSendGossipInPing", test_swim_shouldSendGossipInPing), ("test_swim_shouldSendGossipInPingReq", test_swim_shouldSendGossipInPingReq), diff --git a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift index dd02c53a9..9c50bc98b 100644 --- a/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/SWIM/SWIMShellClusteredTests.swift @@ -34,10 +34,12 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { } override func configureLogCapture(settings: inout LogCapture.Settings) { - settings.filterActorPath = "/system/cluster/swim" + settings.filterActorPaths = ["/user/SWIM"] // the mocked one +// settings.filterActorPaths = ["/system/cluster/swim"] // in case we test against the real one } - // ==== ---------------------------------------------------------------------------------------------------------------- + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Pinging nodes func test_swim_shouldRespondWithAckToPing() throws { let first = self.setUpFirst() @@ -213,6 +215,51 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { try self.awaitStatus(.alive(incarnation: 1), for: remoteProbeRef, on: ref, within: .seconds(1)) } + // FIXME: Can't seem to implement a hardened test like this... + func ignored_test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain() throws { + try shouldNotThrow { + let first = self.setUpFirst { settings in + settings.cluster.swim.disabled = true // since we drive one manually + } + let second = self.setUpSecond { settings in + settings.cluster.swim.disabled = true // since we drive one manually + } + + first.cluster.join(node: second.cluster.node.node) + try assertAssociated(first, withExactly: second.cluster.node) + + let p = self.testKit(second).spawnTestProbe(expecting: SWIM.Message.self) + let remoteMemberRef = first._resolveKnownRemote(p.ref, onRemoteSystem: second) + + let pingTimeout: TimeAmount = .milliseconds(100) + let ref = try first.spawn("SWIM", self.swimBehavior(members: [remoteMemberRef], clusterRef: self.firstClusterProbe.ref, configuredWith: { settings in + settings.failureDetector.suspicionTimeoutPeriodsMax = 3 + settings.failureDetector.pingTimeout = pingTimeout + })) + + // spin not-replying for more than timeoutPeriodsMax, such that the member will be marked as unreachable + for _ in 0 ..< SWIMSettings.default.failureDetector.suspicionTimeoutPeriodsMax + 100 { + ref.tell(.local(.pingRandomMember)) + try self.expectPing(on: p, reply: false) + } + + // should become unreachable + guard case .command(.failureDetectorReachabilityChanged(_, .unreachable)) = try firstClusterProbe.expectMessage() else { + throw self.testKit(first).fail("expected to receive `.command(.failureDetectorReachabilityChanged)`, but got `\(firstClusterProbe.lastMessage, orElse: "nil")`") + } + + // if it'd directly reply while unreachable (which is an "extended period suspect" really), it can come back alive + ref.tell(.local(.pingRandomMember)) + try self.expectPing(on: p, reply: true, incarnation: 2) + + // since we replied again with alive, should become reachable + try self.awaitStatus(.alive(incarnation: 2), for: remoteMemberRef, on: ref, within: .seconds(1)) + guard case .command(.failureDetectorReachabilityChanged(_, .reachable)) = try firstClusterProbe.expectMessage() else { + throw self.testKit(first).fail("expected to receive `.command(.failureDetectorReachabilityChanged)`, but got `\(firstClusterProbe.lastMessage, orElse: "nil")`") + } + } + } + func test_swim_shouldNotifyClusterAboutUnreachableNode_afterConfiguredSuspicionTimeout_andMarkDeadWhenConfirmed() throws { let first = self.setUpFirst() let second = self.setUpSecond() @@ -226,7 +273,6 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { let ref = try first.spawn("SWIM", self.swimBehavior(members: [remoteMemberRef], clusterRef: self.firstClusterProbe.ref)) ref.tell(.local(.pingRandomMember)) - try self.expectPing(on: p, reply: false) try self.awaitStatus(.suspect(incarnation: 0), for: remoteMemberRef, on: ref, within: .seconds(1)) @@ -240,9 +286,8 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { // and have the SWIM actor mark the remote node as dead ref.tell(.local(.pingRandomMember)) - let message = try firstClusterProbe.expectMessage() - guard case .command(.failureDetectorReachabilityChanged(let address, .unreachable)) = message else { - throw self.testKit(first).fail("expected to receive `.command(.markUnreachable)`, but got `\(message)`") + guard case .command(.failureDetectorReachabilityChanged(let address, .unreachable)) = try firstClusterProbe.expectMessage() else { + throw self.testKit(first).fail("expected to receive `.command(.failureDetectorReachabilityChanged)`, but got `\(firstClusterProbe.lastMessage, orElse: "nil")`") } try self.holdStatus(.unreachable(incarnation: 0), for: remoteMemberRef, on: ref, within: .milliseconds(200)) @@ -293,11 +338,6 @@ final class SWIMShellClusteredTests: ClusteredNodesTestBase { try self.expectReachabilityInSnapshot(thirdTestKit, node: secondNode, expect: .unreachable) } - func test_swim_shouldNotifyClusterAboutUnreachableNode_andThenReachableAgain() throws { - // TODO: Implement this once the transport is pluggable and we can make it drop random messages - pnote("TODO: Implement this once the transport is pluggable and we can make it drop random messages") - } - /// Passed in `eventStreamProbe` is expected to have been subscribed to the event stream as early as possible, /// as we want to expect the specific reachability event to be sent private func expectReachabilityEvent( diff --git a/wat.json b/wat.json deleted file mode 100644 index 56025cb5e..000000000 --- a/wat.json +++ /dev/null @@ -1,668 +0,0 @@ -{ - "cLanguageStandard": null, - "cxxLanguageStandard": "c++11", - "dependencies": [ - { - "requirement": { - "range": [ - { - "lowerBound": "2.8.0", - "upperBound": "3.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-nio.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "1.2.0", - "upperBound": "2.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-nio-extras.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "2.2.0", - "upperBound": "3.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-nio-ssl.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "1.7.0", - "upperBound": "2.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-protobuf.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "1.1.1", - "upperBound": "2.0.0" - } - ] - }, - "url": "https:\/\/github.com\/ianpartridge\/swift-backtrace.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "1.0.0", - "upperBound": "2.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-log.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "1.0.0", - "upperBound": "2.0.0" - } - ] - }, - "url": "https:\/\/github.com\/apple\/swift-metrics.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "0.13.0", - "upperBound": "1.0.0" - } - ] - }, - "url": "https:\/\/github.com\/stencilproject\/Stencil.git" - }, - { - "requirement": { - "range": [ - { - "lowerBound": "4.0.0", - "upperBound": "5.0.0" - } - ] - }, - "url": "https:\/\/github.com\/JohnSundell\/Files" - }, - { - "requirement": { - "exact": [ - "0.50100.0" - ] - }, - "url": "https:\/\/github.com\/apple\/swift-syntax.git" - } - ], - "manifestVersion": "v5_1", - "name": "swift-distributed-actors", - "pkgConfig": null, - "platforms": [ - ], - "products": [ - { - "name": "DistributedActors", - "targets": [ - "DistributedActors" - ], - "type": { - "library": [ - "automatic" - ] - } - }, - { - "name": "DistributedActorsTestKit", - "targets": [ - "DistributedActorsTestKit" - ], - "type": { - "library": [ - "automatic" - ] - } - }, - { - "name": "GenActors", - "targets": [ - "GenActors" - ], - "type": { - "executable": null - } - }, - { - "name": "DistributedActorsBenchmarks", - "targets": [ - "DistributedActorsBenchmarks" - ], - "type": { - "executable": null - } - } - ], - "providers": null, - "swiftLanguageVersions": null, - "targets": [ - { - "dependencies": [ - { - "byName": [ - "NIO" - ] - }, - { - "byName": [ - "NIOSSL" - ] - }, - { - "byName": [ - "NIOExtras" - ] - }, - { - "byName": [ - "NIOFoundationCompat" - ] - }, - { - "byName": [ - "SwiftProtobuf" - ] - }, - { - "byName": [ - "Logging" - ] - }, - { - "byName": [ - "Metrics" - ] - }, - { - "byName": [ - "Backtrace" - ] - }, - { - "byName": [ - "DistributedActorsConcurrencyHelpers" - ] - }, - { - "byName": [ - "CDistributedActorsMailbox" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActors", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "SwiftSyntax" - ] - }, - { - "byName": [ - "Stencil" - ] - }, - { - "byName": [ - "Files" - ] - } - ], - "exclude": [ - ], - "name": "GenActors", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "HelloXPCService", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "HelloXPC", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "Files" - ] - } - ], - "exclude": [ - ], - "name": "CXPCActorable", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "CXPCActorable" - ] - }, - { - "byName": [ - "Files" - ] - } - ], - "exclude": [ - ], - "name": "XPCActorable", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - ], - "exclude": [ - ], - "name": "XPCServiceProvider", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - ], - "exclude": [ - ], - "name": "XPCLibCaller", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "XPCActorable" - ] - }, - { - "byName": [ - "Files" - ] - } - ], - "exclude": [ - ], - "name": "XPCActorServiceProvider", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "XPCActorServiceProvider" - ] - }, - { - "byName": [ - "XPCActorable" - ] - }, - { - "byName": [ - "Files" - ] - } - ], - "exclude": [ - ], - "name": "XPCActorCaller", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - ], - "exclude": [ - ], - "name": "XPCSquirrel", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "DistributedActorsConcurrencyHelpers" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsTestKit", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "DistributedActorsTestKit" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsDocumentationTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "DistributedActorsTestKit" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "DistributedActorsTestKit" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsTestKitTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "CDistributedActorsMailbox" - ] - }, - { - "byName": [ - "DistributedActorsTestKit" - ] - } - ], - "exclude": [ - ], - "name": "CDistributedActorsMailboxTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActorsConcurrencyHelpers" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsConcurrencyHelpersTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "GenActors" - ] - }, - { - "byName": [ - "DistributedActorsTestKit" - ] - } - ], - "exclude": [ - ], - "name": "GenActorsTests", - "settings": [ - ], - "type": "test" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "it_ProcessIsolated_escalatingWorkers", - "path": "IntegrationTests\/tests_02_process_isolated\/it_ProcessIsolated_escalatingWorkers", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "it_ProcessIsolated_respawnsServants", - "path": "IntegrationTests\/tests_02_process_isolated\/it_ProcessIsolated_respawnsServants", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "it_ProcessIsolated_noLeaking", - "path": "IntegrationTests\/tests_02_process_isolated\/it_ProcessIsolated_noLeaking", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "it_ProcessIsolated_backoffRespawn", - "path": "IntegrationTests\/tests_02_process_isolated\/it_ProcessIsolated_backoffRespawn", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - }, - { - "byName": [ - "SwiftBenchmarkTools" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsBenchmarks", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "DistributedActors" - ] - } - ], - "exclude": [ - ], - "name": "SwiftBenchmarkTools", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - ], - "exclude": [ - ], - "name": "CDistributedActorsMailbox", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - ], - "exclude": [ - ], - "name": "CDistributedActorsAtomics", - "settings": [ - ], - "type": "regular" - }, - { - "dependencies": [ - { - "byName": [ - "CDistributedActorsAtomics" - ] - } - ], - "exclude": [ - ], - "name": "DistributedActorsConcurrencyHelpers", - "settings": [ - ], - "type": "regular" - } - ] -}