From 646ed86e36132f1a307aefa449d88e87051f2c81 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 16 Jun 2020 00:04:20 +0900 Subject: [PATCH] +test,downing add many-nodes-test that kills many nodes at once --- .../Cluster/ClusteredNodesTestBase.swift | 7 ++ .../DowningClusteredTests.swift | 67 +++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift index 448036f7e..a4945a312 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift @@ -139,6 +139,13 @@ open class ClusteredNodesTestBase: XCTestCase { public func ensureNodes( _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: UniqueNode..., file: StaticString = #file, line: UInt = #line + ) throws { + try self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line) + } + + public func ensureNodes( + _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: [UniqueNode], + file: StaticString = #file, line: UInt = #line ) throws { guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else { fatalError("Must at least have 1 system present to use [\(#function)]") diff --git a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift index ba3adb0f6..0104ae5c9 100644 --- a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift @@ -216,4 +216,71 @@ final class DowningClusteredTests: ClusteredNodesTestBase { settings.cluster.downingStrategy = self.downingStrategy } } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: "Mass" Downing + + func test_many_nonLeaders_shouldPropagateToOtherNodes() throws { + let first = self.setUpNode("node-1") + var nodes = (2 ... 7).map { self.setUpNode("node-\($0)") } + + pinfo("Joining \(nodes.count + 1) nodes...") + let joiningStart = first.metrics.uptimeNanoseconds() + + nodes.forEach { first.cluster.join(node: $0.cluster.node.node) } + try self.ensureNodes(.up, within: .seconds(30), nodes: nodes.map { $0.cluster.node }) + + let joiningStop = first.metrics.uptimeNanoseconds() + pinfo("Joined \(nodes.count + 1) nodes, took: \(TimeAmount.nanoseconds(joiningStop - joiningStart).prettyDescription)") + + let nodesToDown = nodes.prefix(nodes.count / 2) + nodes.removeFirst(nodes.count / 2) + + pinfo("Downing \(nodes.count / 2) nodes: \(nodesToDown.map { $0.cluster.node })") + for node in nodesToDown { + node.shutdown().wait() + } + + nodes.append(first) + var probes: [UniqueNode: ActorTestProbe] = [:] + for remainingNode in nodes { + probes[remainingNode.cluster.node] = self.testKit(remainingNode).spawnEventStreamTestProbe(subscribedTo: remainingNode.cluster.events) + } + + func expectedDownMemberEventsFishing( + on: ActorSystem, + file: StaticString = #file, line: UInt = #line + ) -> (Cluster.Event) -> ActorTestProbe.FishingDirective { + pinfo("Expecting \(nodesToDown.map { $0.cluster.node.node }) to become [.down] on [\(on.cluster.node.node)]") + var removalsFound = 0 + + return { event in + switch event { + case .membershipChange(let change) where change.isRemoval: + pinfo("\(on.cluster.node.node): \(change)", file: file, line: line) + removalsFound += 1 + if removalsFound == nodesToDown.count { + return .catchComplete(change) + } else { + return .catchContinue(change) + } + case .membershipChange(let change) where change.isDown: + pinfo("\(on.cluster.node.node): \(change)", file: file, line: line) + return .catchContinue(change) + default: + return .ignore + } + } + } + + for remainingNode in nodes { + let probe = probes[remainingNode.cluster.node]! + let events = try probe.fishFor(Cluster.MembershipChange.self, within: .seconds(60), expectedDownMemberEventsFishing(on: remainingNode)) + + events.shouldContain(where: { change in change.toStatus.isDown && (change.fromStatus == .joining || change.fromStatus == .up) }) + for expectedDownNode in nodesToDown { + events.shouldContain(Cluster.MembershipChange(node: expectedDownNode.cluster.node, fromStatus: .down, toStatus: .removed)) + } + } + } }