From 8e0e4dc543ab64006da8225089c51a8dbbbdded4 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 29 Jul 2022 16:53:14 +0900 Subject: [PATCH] async leadership WIP --- ...adership.swift => ClusterLeadership.swift} | 254 ++++++++++-------- .../Cluster/ClusterShell.swift | 12 +- .../Cluster/LeadershipTests.swift | 76 +++--- 3 files changed, 186 insertions(+), 156 deletions(-) rename Sources/DistributedActors/Cluster/{Leadership.swift => ClusterLeadership.swift} (63%) diff --git a/Sources/DistributedActors/Cluster/Leadership.swift b/Sources/DistributedActors/Cluster/ClusterLeadership.swift similarity index 63% rename from Sources/DistributedActors/Cluster/Leadership.swift rename to Sources/DistributedActors/Cluster/ClusterLeadership.swift index bc34978e2..8e5504d2b 100644 --- a/Sources/DistributedActors/Cluster/Leadership.swift +++ b/Sources/DistributedActors/Cluster/ClusterLeadership.swift @@ -42,146 +42,170 @@ import NIO // Future /// If a new member is selected as leader, a ``Cluster/Event`` carrying ``Cluster/LeadershipChange`` will be emitted. /// Other actors may subscribe to `ClusterSystem.cluster.events` in order to receive and react to such changes, /// e.g. if an actor should only perform its duties if it is residing on the current leader node. -public protocol LeaderElection { +internal protocol LeaderElection { /// Select a member to become a leader out of the existing `Membership`. /// /// Decisions about electing/selecting a leader may be performed asynchronously. - mutating func runElection(context: LeaderElectionContext, membership: Cluster.Membership) -> LeaderElectionResult + mutating func runElection(membership: Cluster.Membership) async throws -> Cluster.LeadershipChange? } public struct LeaderElectionContext { public var log: Logger - public let loop: EventLoop internal init(_ ownerContext: _ActorContext) { self.log = ownerContext.log - self.loop = ownerContext.system._eventLoopGroup.next() } - internal init(log: Logger, eventLoop: EventLoop) { + internal init(log: Logger) { self.log = log - self.loop = eventLoop } } -/// Result of running a `LeaderElection`, which may be performed asynchronously (or not). -/// -/// Synchronous leader elections are usually implemented by predictably ordering the nodes, e.g. ordering them by address -/// and picking the "lowest", which is a variant of "ranking" leader election. Asynchronous elections may involve having -/// to reach out to the other members and them performing a "vote" about who shall become the leader. As this involves -/// actor coordination, the result of such election is going to be provided asynchronously. -/// -/// A change in leadership will result in a `Cluster.LeadershipChange` event being emitted in the system's cluster event stream. -public struct LeaderElectionResult: _AsyncResult { - public typealias Value = Cluster.LeadershipChange? - let future: EventLoopFuture - - init(_ future: EventLoopFuture) { - self.future = future - } - - public func _onComplete(_ callback: @escaping (Result) -> Void) { - self.future.whenComplete(callback) - } - - public func withTimeout(after timeout: Duration) -> LeaderElectionResult { - LeaderElectionResult(self.future.withTimeout(after: timeout)) - } -} +///// Result of running a `LeaderElection`, which may be performed asynchronously (or not). +///// +///// Synchronous leader elections are usually implemented by predictably ordering the nodes, e.g. ordering them by address +///// and picking the "lowest", which is a variant of "ranking" leader election. Asynchronous elections may involve having +///// to reach out to the other members and them performing a "vote" about who shall become the leader. As this involves +///// actor coordination, the result of such election is going to be provided asynchronously. +///// +///// A change in leadership will result in a `Cluster.LeadershipChange` event being emitted in the system's cluster event stream. +//public struct LeaderElectionResult: _AsyncResult { +// public typealias Value = Cluster.LeadershipChange? +// let future: EventLoopFuture +// +// init(_ future: EventLoopFuture) { +// self.future = future +// } +// +// public func _onComplete(_ callback: @escaping (Result) -> Void) { +// self.future.whenComplete(callback) +// } +// +// public func withTimeout(after timeout: Duration) -> LeaderElectionResult { +// LeaderElectionResult(self.future.withTimeout(after: timeout)) +// } +//} // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: Leadership +// MARK: ClusterLeadership /// Leadership encapsulates various `LeaderElection` strategies. /// /// - SeeAlso: `LeaderElection` -public struct Leadership {} - -extension Leadership { - final class Shell { - static let naming: _ActorNaming = "leadership" +public struct ClusterLeadership {} - private var membership: Cluster.Membership // FIXME: we need to ensure the membership is always up to date -- we need the initial snapshot or a diff from a zero state etc. - private var election: LeaderElection - - init(_ election: LeaderElection) { - self.election = election +extension ClusterLeadership { + + /// A leadership election strategy which determines the leader by observing the membership of the cluster, + /// and electing the member with the "lowest" address identity. + /// + /// This election scheme does not require any cooperation between nodes, and all nodes determine the same leader given the same membership. + actor Shell { + typealias ActorSystem = ClusterSystem + + let actorSystem: ActorSystem + +// @ActorID.Metadata(\.path) +// var path: ActorPath + + var strategy: LeaderElection + var log: Logger! + var clusterEventsTask: Task? + + var membership: Cluster.Membership = .empty + + init(strategy: LeaderElection, actorSystem: ClusterSystem) async { + self.strategy = strategy + self.actorSystem = actorSystem + + var log = actorSystem.log + log[metadataKey: "actor/path"] = "$leaderElection" + self.log = log +// self.path = try! ._user.appending("leaderElector") + + // Prepare initial membership; + // Ensure we have our own node as joining right away, even if the events don't have it emitted yet self.membership = .empty - } - - var behavior: _Behavior { - .setup { context in - context.log.trace("Configured with \(self.election)") - context.system.cluster.events.subscribe(context.myself) - - // FIXME: we have to add "own node" since we're not getting the .snapshot... so we have to manually act as if.. - _ = self.membership.applyMembershipChange(Cluster.MembershipChange(node: context.system.cluster.uniqueNode, previousStatus: nil, toStatus: .joining)) - return self.runElection(context) + _ = self.membership.applyMembershipChange(Cluster.MembershipChange(node: actorSystem.cluster.uniqueNode, previousStatus: nil, toStatus: .joining)) + + self.clusterEventsTask = Task { + await listenToClusterEvents() } } - - private var ready: _Behavior { - .receive { context, event in + + deinit { + self.clusterEventsTask?.cancel() + self.clusterEventsTask = nil + } + + func listenToClusterEvents() async { + let clusterEvents = self.actorSystem.cluster.events + + for try await event in clusterEvents { switch event { case .snapshot(let membership): self.membership = membership - return .same - + + await self.runElection() + case .membershipChange(let change): guard self.membership.applyMembershipChange(change) != nil else { - return .same // nothing changed, no need to select anew + continue // out of then whenLocal } - - return self.runElection(context) - + + await self.runElection() + case .reachabilityChange(let change): _ = self.membership.applyReachabilityChange(change) - - return self.runElection(context) - + + await self.runElection() + case .leadershipChange: - return .same // we are the source of such events! - + continue // we are the source of such events! + case ._PLEASE_DO_NOT_EXHAUSTIVELY_MATCH_THIS_ENUM_NEW_CASES_MIGHT_BE_ADDED_IN_THE_FUTURE: - context.log.error("Received Cluster.Event [\(event)]. This should not happen, please file an issue.") - return .same + fatalError("Received impossible event: \(event)") } } } - - func runElection(_ context: _ActorContext) -> _Behavior { - var electionContext = LeaderElectionContext(context) - electionContext.log[metadataKey: "leadership/election"] = "\(String(reflecting: type(of: self.election)))" - let electionResult = self.election.runElection(context: electionContext, membership: self.membership) - - // TODO: if/when we'd have some election scheme that is async, e.g. "vote" then this timeout should NOT be infinite and should be handled properly - return context.awaitResult(of: electionResult, timeout: .effectivelyInfinite) { - switch $0 { - case .success(.some(let leadershipChange)): - guard let changed = try self.membership.applyLeadershipChange(to: leadershipChange.newLeader) else { - context.log.trace("The leadership change that was decided on by \(self.election) results in no change from current leadership state.") - return self.ready - } - context.system.cluster.ref.tell(.requestMembershipChange(.leadershipChange(changed))) - return self.ready - - case .success(.none): - // no change decided upon - return self.ready - - case .failure(let err): - context.log.warning("Failed to select leader... Error: \(err)") - return self.ready + + func runElection() async { + do { + let change = try await self.strategy.runElection(membership: self.membership) + + guard let leadershipChange = change else { + // no leadership change was made + return } + + self.requestLeadershipChange(to: leadershipChange.newLeader) + + } catch { + self.log.warning("Failed to select leader", metadata: [ + "strategy": "\(self.strategy)", + "membership": "\(self.membership)", + "error": "\(error)", + ]) + return + } + } + + func requestLeadershipChange(to newLeader: Cluster.Member?) { + guard let changed = try? self.membership.applyLeadershipChange(to: newLeader) else { + self.log.trace("The leadership change that was decided on by \(self.strategy) results in no change from current leadership state.") + return } + + self.actorSystem.cluster.ref.tell(.requestMembershipChange(.leadershipChange(changed))) } } + } // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: LowestAddressReachableMember election strategy -extension Leadership { +extension ClusterLeadership { /// Simple strategy which does not require any additional coordination from members to select a leader. /// /// All `MemberStatus.joining`, `MemberStatus.up` _reachable_ members are sorted by their addresses, @@ -224,6 +248,9 @@ extension Leadership { /// potentially be unsafe given less than `minimumNrOfMembers` nodes. /// public struct LowestReachableMember: LeaderElection { + + var log: Logger + // TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough // guarantees, and you should consider using another scheme or consensus based modes. let minimumNumberOfMembersToDecide: Int @@ -231,38 +258,39 @@ extension Leadership { /// - param minimumNrOfMembers: minimum number of REACHABLE members when a leader can be elected. public init(minimumNrOfMembers: Int, loseLeadershipIfBelowMinNrOfMembers: Bool = false) { + self.log = Logger(label: "\(Self.self)") self.minimumNumberOfMembersToDecide = minimumNrOfMembers self.loseLeadershipIfBelowMinNrOfMembers = loseLeadershipIfBelowMinNrOfMembers } - public mutating func runElection(context: LeaderElectionContext, membership: Cluster.Membership) -> LeaderElectionResult { + public mutating func runElection(membership: Cluster.Membership) async throws -> Cluster.LeadershipChange? { var membership = membership let membersToSelectAmong = membership.members(atMost: .up, reachability: .reachable) let enoughMembers = membersToSelectAmong.count >= self.minimumNumberOfMembersToDecide if enoughMembers { - return self.selectByLowestAddress(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong) + return self.selectByLowestAddress(membership: &membership, membersToSelectAmong: membersToSelectAmong) } else { - context.log.info("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)") + self.log.info("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)") if self.loseLeadershipIfBelowMinNrOfMembers { - return self.notEnoughMembers(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong) + return self.notEnoughMembers(membership: &membership, membersToSelectAmong: membersToSelectAmong) } else { - return self.belowMinMembersTryKeepStableLeader(context: context, membership: &membership) + return self.belowMinMembersTryKeepStableLeader(membership: &membership) } } } - internal mutating func notEnoughMembers(context: LeaderElectionContext, membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> LeaderElectionResult { + internal mutating func notEnoughMembers(membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> Cluster.LeadershipChange? { // not enough members to make a decision yet - context.log.trace("Not enough members to select leader from, minimum nr of members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)]") + self.log.trace("Not enough members to select leader from, minimum nr of members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)]") if let currentLeader = membership.leader { // Clear current leader and trigger `Cluster.LeadershipChange` let change = try! membership.applyLeadershipChange(to: nil) // try!-safe, because changing leader to nil is safe - context.log.trace("Removing leader [\(currentLeader)]") - return .init(context.loop.next().makeSucceededFuture(change)) + self.log.trace("Removing leader [\(currentLeader)]") + return change } else { - return .init(context.loop.next().makeSucceededFuture(nil)) + return nil } } @@ -273,25 +301,25 @@ extension Leadership { /// - it still is reachable and part of the membership /// /// Other nodes MAY NOT be elected, as we are below the minimum members threshold, we can only keep an existing leader, but not elect new ones. - internal mutating func belowMinMembersTryKeepStableLeader(context: LeaderElectionContext, membership: inout Cluster.Membership) -> LeaderElectionResult { + internal mutating func belowMinMembersTryKeepStableLeader(membership: inout Cluster.Membership) -> Cluster.LeadershipChange? { guard let currentLeader = membership.leader else { // there was no leader previously, and now we are below `minimumNumberOfMembersToDecide` thus cannot select a new one - return .init(context.loop.next().makeSucceededFuture(nil)) // no change + return nil // no change } guard currentLeader.status <= .up else { // the leader is not up anymore, and we have to remove it (cannot keep trusting it) let change = try! membership.applyLeadershipChange(to: nil) // try!-safe, because changing leader to nil is safe - context.log.trace("Removing leader [\(currentLeader)], not enough members to elect new leader.") - return .init(context.loop.next().makeSucceededFuture(change)) + self.log.trace("Removing leader [\(currentLeader)], not enough members to elect new leader.") + return change } // the leader is still up, regardless of reachability, we still trust it; // as we do not have enough members to do another election, we stick to the node we know. - return .init(context.loop.next().makeSucceededFuture(nil)) + return nil } - internal mutating func selectByLowestAddress(context: LeaderElectionContext, membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> LeaderElectionResult { + internal mutating func selectByLowestAddress(membership: inout Cluster.Membership, membersToSelectAmong: [Cluster.Member]) -> Cluster.LeadershipChange? { let oldLeader = membership.leader // select the leader, by lowest address @@ -300,15 +328,15 @@ extension Leadership { .first if let change = try! membership.applyLeadershipChange(to: leader) { // try! safe, as we KNOW this member is part of membership - context.log.debug( + self.log.debug( "Selected new leader: [\(oldLeader, orElse: "nil") -> \(leader, orElse: "nil")]", metadata: [ "membership": "\(membership)", ] ) - return .init(context.loop.next().makeSucceededFuture(change)) + return change } else { - return .init(context.loop.next().makeSucceededFuture(nil)) // no change, e.g. the new/old leader are the same + return nil // no change, e.g. the new/old leader are the same } } } @@ -327,12 +355,12 @@ extension ClusterSystemSettings { private let underlying: _LeadershipSelectionSettings - func make(_: ClusterSystemSettings) -> LeaderElection? { + internal func make(_: ClusterSystemSettings) -> LeaderElection? { switch self.underlying { case .none: return nil case .lowestReachable(let nr): - return Leadership.LowestReachableMember(minimumNrOfMembers: nr) + return ClusterLeadership.LowestReachableMember(minimumNrOfMembers: nr) } } diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index a605442c4..015f724bb 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -405,10 +405,14 @@ extension ClusterShell { } // 3) leader election, so it may move members: .joining -> .up (and other `LeaderAction`s) - if let leaderElection = self.settings.autoLeaderElection.make(context.system.cluster.settings) { - let leadershipShell = Leadership.Shell(leaderElection) - let leadership = try context._spawn(Leadership.Shell.naming, leadershipShell.behavior) - context.watch(leadership) // if leadership fails for some reason, we are in trouble and need to know about it + if let leaderElectionStrategy = self.settings.autoLeaderElection.make(context.system.cluster.settings) { + Task { + let leaderElectionActor = await ClusterLeadership.Shell(strategy: leaderElectionStrategy, actorSystem: context.system) + } + // let leadershipShell = ClusterLeadership.Shell(leaderElection) +// let leaderElection = ClusterLeaderElection(leaderElection) +// let leadership = try context._spawn(ClusterLeadership.Shell.naming, leadershipShell.behavior) +// context.watch(leadership) // if leadership fails for some reason, we are in trouble and need to know about it } context.log.info("Binding to: [\(uniqueBindAddress)]") diff --git a/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift b/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift index 9587b8817..a78eff28c 100644 --- a/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift +++ b/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift @@ -24,8 +24,6 @@ final class LeadershipTests: XCTestCase { let memberC = Cluster.Member(node: UniqueNode(node: Node(systemName: "System", host: "3.3.3.3", port: 9119), nid: .random()), status: .up) let newMember = Cluster.Member(node: UniqueNode(node: Node(systemName: "System", host: "4.4.4.4", port: 1001), nid: .random()), status: .up) - let fakeContext = LeaderElectionContext(log: NoopLogger.make(), eventLoop: EmbeddedEventLoop()) - lazy var initialMembership: Cluster.Membership = [ memberA, memberB, memberC, ] @@ -33,63 +31,63 @@ final class LeadershipTests: XCTestCase { // ==== ------------------------------------------------------------------------------------------------------------ // MARK: LowestAddressReachableMember - func test_LowestAddressReachableMember_selectLeader() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_selectLeader() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) let membership = self.initialMembership - let change: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change: Cluster.LeadershipChange? = try await election.runElection(context: self.fakeContext, membership: membership) change.shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) } func test_LowestAddressReachableMember_notEnoughMembersToDecide() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = membership.removeCompletely(self.memberA.uniqueNode) // 2 members -> not enough to make decision anymore - let change1: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change1: Cluster.LeadershipChange? = try await election.runElection(membership: membership) change1.shouldBeNil() _ = membership.join(self.newMember.uniqueNode) // 3 members again, should work - let change2: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change2: Cluster.LeadershipChange? = try await election.runElection(membership: membership) change2.shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberB)) } - func test_LowestAddressReachableMember_notEnoughReachableMembersToDecide() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_notEnoughReachableMembersToDecide() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = membership.mark(self.memberB.uniqueNode, reachability: .unreachable) // 2 reachable members -> not enough to make decision anymore - let change1: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change1: Cluster.LeadershipChange? = try await election.runElection(membership: membership) change1.shouldBeNil() _ = membership.join(self.newMember.uniqueNode) // 3 reachable members again, 1 unreachable, should work - let change2: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change2: Cluster.LeadershipChange? = try await election.runElection(membership: membership) change2.shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) } func test_LowestAddressReachableMember_onlyUnreachableMembers_cantDecide() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = membership.mark(self.memberA.uniqueNode, reachability: .unreachable) _ = membership.mark(self.memberB.uniqueNode, reachability: .unreachable) // 1 reachable member -> not enough to make decision anymore - let change1: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change1: Cluster.LeadershipChange? = try await election.runElection(membership: membership) change1.shouldBeNil() } - func test_LowestAddressReachableMember_notEnoughMembersToDecide_fromWithToWithoutLeader() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_notEnoughMembersToDecide_fromWithToWithoutLeader() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = try! membership.applyLeadershipChange(to: self.memberA) // try! because `memberA` is a member @@ -102,41 +100,41 @@ final class LeadershipTests: XCTestCase { // 2 members -> not enough to make decision anymore // Since we go from a leader to without, there should be a change - let change: Cluster.LeadershipChange? = try election.runElection(context: self.fakeContext, membership: membership).future.wait() + let change: Cluster.LeadershipChange? = try await election.runElection(membership: membership) leader?.status = .down change.shouldEqual(Cluster.LeadershipChange(oldLeader: leader, newLeader: nil)) } - func test_LowestAddressReachableMember_whenCurrentLeaderDown() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_whenCurrentLeaderDown() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = membership.join(self.newMember.uniqueNode) - (try election.runElection(context: self.fakeContext, membership: membership).future.wait()) + (try await election.runElection(membership: membership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) _ = membership.mark(self.memberA.uniqueNode, as: .down) - (try election.runElection(context: self.fakeContext, membership: membership).future.wait()) + (try election.runElection(context: self.fakeContext, membership: membership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberB)) } - func test_LowestAddressReachableMember_whenCurrentLeaderDown_enoughMembers() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_whenCurrentLeaderDown_enoughMembers() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership _ = membership.join(self.newMember.uniqueNode) - (try election.runElection(context: self.fakeContext, membership: membership).future.wait()) + (try election.runElection(context: self.fakeContext, membership: membership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) _ = membership.mark(self.memberA.uniqueNode, as: .down) - (try election.runElection(context: self.fakeContext, membership: membership).future.wait()) + (try election.runElection(context: self.fakeContext, membership: membership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberB)) } - func test_LowestAddressReachableMember_whenCurrentLeaderUnreachable_notEnoughMinMembers() throws { - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) + func test_LowestAddressReachableMember_whenCurrentLeaderUnreachable_notEnoughMinMembers() async throws { + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) var membership = self.initialMembership let applyToMembership: (Cluster.LeadershipChange?) throws -> (Cluster.LeadershipChange?) = { change in @@ -146,25 +144,25 @@ final class LeadershipTests: XCTestCase { return change } - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) _ = membership.mark(self.memberA.uniqueNode, reachability: .unreachable) - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(nil) membership.leader.shouldEqual(self.memberA.asUnreachable) } - func test_LowestAddressReachableMember_keepLeader_notEnoughMembers_DO_NOT_loseLeadershipIfBelowMinNrOfMembers() throws { + func test_LowestAddressReachableMember_keepLeader_notEnoughMembers_DO_NOT_loseLeadershipIfBelowMinNrOfMembers() async throws { // - 3 nodes join // - first becomes leader // - third leaves // - second leaves // ! no need to drop the leadership from the first node, it shall remain the leader; - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3) // loseLeadershipIfBelowMinNrOfMembers: false by default + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3) // loseLeadershipIfBelowMinNrOfMembers: false by default var membership: Cluster.Membership = self.initialMembership let applyToMembership: (Cluster.LeadershipChange?) throws -> (Cluster.LeadershipChange?) = { change in @@ -174,33 +172,33 @@ final class LeadershipTests: XCTestCase { return change } - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) // down third _ = membership.mark(self.memberC.uniqueNode, as: .down) // no reason to remove the leadership from the first node - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(nil) // down second _ = membership.mark(self.memberB.uniqueNode, as: .down) // STILL no reason to remove the leadership from the first node - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(nil) membership.leader.shouldEqual(self.memberA) } - func test_LowestAddressReachableMember_keepLeader_notEnoughMembers_DO_loseLeadershipIfBelowMinNrOfMembers() throws { + func test_LowestAddressReachableMember_keepLeader_notEnoughMembers_DO_loseLeadershipIfBelowMinNrOfMembers() async throws { // - 3 nodes join // - first becomes leader // - third leaves // ! not enough members to sustain leader, it should not be trusted anymore - var election = Leadership.LowestReachableMember(minimumNrOfMembers: 3, loseLeadershipIfBelowMinNrOfMembers: true) + var election = ClusterLeadership.LowestReachableMember(minimumNrOfMembers: 3, loseLeadershipIfBelowMinNrOfMembers: true) var membership: Cluster.Membership = self.initialMembership let applyToMembership: (Cluster.LeadershipChange?) throws -> (Cluster.LeadershipChange?) = { change in @@ -210,21 +208,21 @@ final class LeadershipTests: XCTestCase { return change } - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(Cluster.LeadershipChange(oldLeader: nil, newLeader: self.memberA)) // down third _ = membership.mark(self.memberC.uniqueNode, as: .down) // no reason to remove the leadership from the first node - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(context: self.fakeContext, membership: membership) .map(applyToMembership) .shouldEqual(Cluster.LeadershipChange(oldLeader: self.memberA, newLeader: nil)) // down second _ = membership.mark(self.memberB.uniqueNode, as: .down) // STILL no reason to remove the leadership from the first node - try election.runElection(context: self.fakeContext, membership: membership).future.wait() + try election.runElection(membership: membership) .map(applyToMembership) .shouldEqual(nil)