From bce032a2a627d1c7a8cd056c29505607820294fc Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Wed, 17 Jun 2020 16:34:40 +0900 Subject: [PATCH] =test #435 hardening test_singletonByClusterLeadership_withLeaderChange --- Sources/DistributedActors/ActorSystem.swift | 5 ++ .../ClusteredActorSystemsXCTestCase.swift | 4 +- .../ActorSingletonPluginClusteredTests.swift | 63 +++++++++++++++---- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/Sources/DistributedActors/ActorSystem.swift b/Sources/DistributedActors/ActorSystem.swift index 3cebae3de..8ae7c20a4 100644 --- a/Sources/DistributedActors/ActorSystem.swift +++ b/Sources/DistributedActors/ActorSystem.swift @@ -97,6 +97,11 @@ public final class ActorSystem { self._metrics } + // TODO: become the system's uptime + internal func uptimeNanoseconds() -> Int64 { + Deadline.now().uptimeNanoseconds + } + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Cluster diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift index 4795cce08..14813bb5e 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift @@ -141,14 +141,14 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase { } public func ensureNodes( - _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: UniqueNode..., + _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(20), nodes: UniqueNode..., file: StaticString = #file, line: UInt = #line ) throws { try self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line) } public func ensureNodes( - _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: [UniqueNode], + _ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(20), nodes: [UniqueNode], file: StaticString = #file, line: UInt = #line ) throws { guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else { diff --git a/Tests/ActorSingletonPluginTests/ActorSingletonPluginClusteredTests.swift b/Tests/ActorSingletonPluginTests/ActorSingletonPluginClusteredTests.swift index f744a3f07..c725b03ef 100644 --- a/Tests/ActorSingletonPluginTests/ActorSingletonPluginClusteredTests.swift +++ b/Tests/ActorSingletonPluginTests/ActorSingletonPluginClusteredTests.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// @testable import ActorSingletonPlugin -import DistributedActors +@testable import DistributedActors import DistributedActorsTestKit import XCTest @@ -183,7 +183,6 @@ final class ActorSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCase let firstNode = first.cluster.node first.cluster.leave() - pinfo("Node \(first.cluster.node) left cluster...") // Make sure that `second` and `third` see `first` as down and become leader-less try self.testKit(second).eventually(within: .seconds(10)) { @@ -194,22 +193,64 @@ final class ActorSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCase try self.assertMemberStatus(on: third, node: firstNode, is: .down) try self.assertLeaderNode(on: third, is: nil) } - - // ~~~~ racy ~~~~ - - // No leader so singleton is not available, messages sent should be stashed - ref2.tell(.greet(name: "Bob-2", _replyTo: replyProbe2.ref)) - ref3.tell(.greet(name: "Charlie-3", _replyTo: replyProbe3.ref)) + pinfo("Node \(first.cluster.node) left cluster...") // `fourth` will become the new leader and singleton + pinfo("Node \(fourth.cluster.node) joining cluster...") fourth.cluster.join(node: second.cluster.node.node) + let start = fourth.uptimeNanoseconds() - try self.ensureNodes(.up, on: second, nodes: fourth.cluster.node, second.cluster.node, third.cluster.node) + // No leader so singleton is not available, messages sent should be stashed + _ = try second.spawn("teller", of: String.self, .setup { context in + context.timers.startPeriodic(key: "periodic-try-send", message: "tick", interval: .seconds(1)) + var attempt = 0 + + return .receiveMessage { _ in + attempt += 1 + // No leader so singleton is not available, messages sent should be stashed + let m2 = "Bob-2 (\(attempt))" + pnote(" Sending: \(m2) -> \(ref2) (it may be terminated/not-re-pointed yet)") + ref2.tell(.greet(name: m2, _replyTo: replyProbe2.ref)) + + let m3 = "Charlie-3 (\(attempt))" + pnote(" Sending: \(m3) -> \(ref3) (it may be terminated/not-re-pointed yet)") + ref3.tell(.greet(name: m3, _replyTo: replyProbe3.ref)) + return .same + } + }) + + try self.ensureNodes(.up, on: second, nodes: second.cluster.node, third.cluster.node, fourth.cluster.node) pinfo("Fourth node joined, will become leader; Members now: \([fourth.cluster.node, second.cluster.node, third.cluster.node])") // The stashed messages get routed to new singleton running on `fourth` - try replyProbe2.expectMessage("Hello-4 Bob-2!") - try replyProbe3.expectMessage("Hello-4 Charlie-3!") + let got2 = try replyProbe2.expectMessage() + got2.shouldStartWith(prefix: "Hello-4 Bob-2") + pinfo("Received reply (by \(replyProbe2.address.path)) from singleton: \(got2)") + if got2 == "Hello-4 Bob-2 (1)!" { + var counter = 0 + while try replyProbe2.maybeExpectMessage(within: .milliseconds(100)) != nil { + counter += 1 + } + pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.") + } else { + pinfo(" Initial messages may have been lost, delivered message: \(got2)") + } + + let got3 = try replyProbe3.expectMessage() + got3.shouldStartWith(prefix: "Hello-4 Charlie-3") + pinfo("Received reply (by \(replyProbe3.address.path)) from singleton: \(got3)") + if got3 == "Hello-4 Charlie-3 (1)!" { + var counter = 0 + while try replyProbe3.maybeExpectMessage(within: .milliseconds(100)) != nil { + counter += 1 + } + pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.") + } else { + pinfo(" Initial messages may have been lost, delivered message: \(got3)") + } + + let stop = fourth.uptimeNanoseconds() + pinfo("Singleton re-pointing took: \(TimeAmount.nanoseconds(stop - start).prettyDescription)") pinfo("Nodes communicated successfully with singleton on [fourth]") }