Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public final class ActorSystem {
self._metrics
}

// TODO: become the system's uptime
internal func uptimeNanoseconds() -> Int64 {
Deadline.now().uptimeNanoseconds
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: UniqueNode...,
_ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(20), nodes: UniqueNode...,
file: StaticString = #file, line: UInt = #line
) throws {
try self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line)
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(15), nodes: [UniqueNode],
_ status: Cluster.MemberStatus, on system: ActorSystem? = nil, within: TimeAmount = .seconds(20), nodes: [UniqueNode],
file: StaticString = #file, line: UInt = #line
) throws {
guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

@testable import ActorSingletonPlugin
import DistributedActors
@testable import DistributedActors
import DistributedActorsTestKit
import XCTest

Expand Down Expand Up @@ -183,7 +183,6 @@ final class ActorSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCase

let firstNode = first.cluster.node
first.cluster.leave()
pinfo("Node \(first.cluster.node) left cluster...")

// Make sure that `second` and `third` see `first` as down and become leader-less
try self.testKit(second).eventually(within: .seconds(10)) {
Expand All @@ -194,22 +193,64 @@ final class ActorSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCase
try self.assertMemberStatus(on: third, node: firstNode, is: .down)
try self.assertLeaderNode(on: third, is: nil)
}

// ~~~~ racy ~~~~

// No leader so singleton is not available, messages sent should be stashed
ref2.tell(.greet(name: "Bob-2", _replyTo: replyProbe2.ref))
ref3.tell(.greet(name: "Charlie-3", _replyTo: replyProbe3.ref))
pinfo("Node \(first.cluster.node) left cluster...")

// `fourth` will become the new leader and singleton
pinfo("Node \(fourth.cluster.node) joining cluster...")
fourth.cluster.join(node: second.cluster.node.node)
let start = fourth.uptimeNanoseconds()

try self.ensureNodes(.up, on: second, nodes: fourth.cluster.node, second.cluster.node, third.cluster.node)
// No leader so singleton is not available, messages sent should be stashed
_ = try second.spawn("teller", of: String.self, .setup { context in
context.timers.startPeriodic(key: "periodic-try-send", message: "tick", interval: .seconds(1))
var attempt = 0

return .receiveMessage { _ in
attempt += 1
// No leader so singleton is not available, messages sent should be stashed
let m2 = "Bob-2 (\(attempt))"
pnote(" Sending: \(m2) -> \(ref2) (it may be terminated/not-re-pointed yet)")
ref2.tell(.greet(name: m2, _replyTo: replyProbe2.ref))

let m3 = "Charlie-3 (\(attempt))"
pnote(" Sending: \(m3) -> \(ref3) (it may be terminated/not-re-pointed yet)")
ref3.tell(.greet(name: m3, _replyTo: replyProbe3.ref))
return .same
}
})

try self.ensureNodes(.up, on: second, nodes: second.cluster.node, third.cluster.node, fourth.cluster.node)
pinfo("Fourth node joined, will become leader; Members now: \([fourth.cluster.node, second.cluster.node, third.cluster.node])")

// The stashed messages get routed to new singleton running on `fourth`
try replyProbe2.expectMessage("Hello-4 Bob-2!")
try replyProbe3.expectMessage("Hello-4 Charlie-3!")
let got2 = try replyProbe2.expectMessage()
got2.shouldStartWith(prefix: "Hello-4 Bob-2")
pinfo("Received reply (by \(replyProbe2.address.path)) from singleton: \(got2)")
if got2 == "Hello-4 Bob-2 (1)!" {
var counter = 0
while try replyProbe2.maybeExpectMessage(within: .milliseconds(100)) != nil {
counter += 1
}
pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.")
} else {
pinfo(" Initial messages may have been lost, delivered message: \(got2)")
}

let got3 = try replyProbe3.expectMessage()
got3.shouldStartWith(prefix: "Hello-4 Charlie-3")
pinfo("Received reply (by \(replyProbe3.address.path)) from singleton: \(got3)")
if got3 == "Hello-4 Charlie-3 (1)!" {
var counter = 0
while try replyProbe3.maybeExpectMessage(within: .milliseconds(100)) != nil {
counter += 1
}
pinfo(" No messages were lost! Including \(counter) more, following the previous delivery.")
} else {
pinfo(" Initial messages may have been lost, delivered message: \(got3)")
}

let stop = fourth.uptimeNanoseconds()
pinfo("Singleton re-pointing took: \(TimeAmount.nanoseconds(stop - start).prettyDescription)")

pinfo("Nodes communicated successfully with singleton on [fourth]")
}
Expand Down