Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Sources/ActorSingletonPlugin/ActorSingleton.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public struct ActorSingletonSettings {
}

/// Controls allocation of the node on which the singleton runs.
public var allocationStrategy: AllocationStrategySettings = .leadership
public var allocationStrategy: AllocationStrategySettings = .byLeadership

public init(name: String) {
self.name = name
Expand All @@ -126,11 +126,11 @@ public struct ActorSingletonSettings {
/// Singleton node allocation strategies.
public enum AllocationStrategySettings {
/// Singletons will run on the cluster leader
case leadership
case byLeadership

func make(_: ClusterSettings, _: ActorSingletonSettings) -> ActorSingletonAllocationStrategy {
switch self {
case .leadership:
case .byLeadership:
return ActorSingletonAllocationByLeadership()
}
}
Expand Down
63 changes: 33 additions & 30 deletions Tests/ActorSingletonPluginTests/ActorSingletonPluginTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class ActorSingletonPluginTests: ClusteredNodesTestBase {
func test_singletonByClusterLeadership() throws {
try shouldNotThrow {
var singletonSettings = ActorSingletonSettings(name: GreeterSingleton.name)
singletonSettings.allocationStrategy = .leadership
singletonSettings.allocationStrategy = .byLeadership

let first = self.setUpNode("first") { settings in
settings.cluster.node.port = 7111
Expand Down Expand Up @@ -102,7 +102,7 @@ final class ActorSingletonPluginTests: ClusteredNodesTestBase {
func test_singletonByClusterLeadership_stashMessagesIfNoLeader() throws {
try shouldNotThrow {
var singletonSettings = ActorSingletonSettings(name: GreeterSingleton.name)
singletonSettings.allocationStrategy = .leadership
singletonSettings.allocationStrategy = .byLeadership

let first = self.setUpNode("first") { settings in
settings.cluster.node.port = 7111
Expand Down Expand Up @@ -158,48 +158,42 @@ final class ActorSingletonPluginTests: ClusteredNodesTestBase {
}
}

// FIXME: flaky test (https://github.com/apple/swift-distributed-actors/issues/346)
func test_singletonByClusterLeadership_withLeaderChange() throws {
try shouldNotThrow {
var singletonSettings = ActorSingletonSettings(name: GreeterSingleton.name)
singletonSettings.allocationStrategy = .leadership
singletonSettings.allocationStrategy = .byLeadership

let first = self.setUpNode("first") { settings in
settings.cluster.node.port = 7111
settings.cluster.autoLeaderElection = .lowestAddress(minNumberOfMembers: 3)
settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)

settings += ActorSingleton(settings: singletonSettings, GreeterSingleton.makeBehavior(instance: GreeterSingleton("Hello-1")))

settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)
}
let second = self.setUpNode("second") { settings in
settings.cluster.node.port = 8222
settings.cluster.autoLeaderElection = .lowestAddress(minNumberOfMembers: 3)
settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)

settings += ActorSingleton(settings: singletonSettings, GreeterSingleton.makeBehavior(instance: GreeterSingleton("Hello-2")))

settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)
}
let third = self.setUpNode("third") { settings in
settings.cluster.node.port = 9333
settings.cluster.autoLeaderElection = .lowestAddress(minNumberOfMembers: 3)
settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)

settings += ActorSingleton(settings: singletonSettings, GreeterSingleton.makeBehavior(instance: GreeterSingleton("Hello-3")))

settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)
}
let fourth = self.setUpNode("fourth") { settings in
settings.cluster.node.port = 7444
settings.cluster.autoLeaderElection = .lowestAddress(minNumberOfMembers: 3)
settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)

settings += ActorSingleton(settings: singletonSettings, GreeterSingleton.makeBehavior(instance: GreeterSingleton("Hello-4")))

settings.serialization.registerCodable(for: GreeterSingleton.Message.self, underId: 10001)
}

first.cluster.join(node: second.cluster.node.node)
third.cluster.join(node: second.cluster.node.node)

try self.joinNodes(node: first, with: second)
try self.joinNodes(node: third, with: second)
try self.ensureNodes(.up, within: .seconds(10), systems: first, second, third)

let replyProbe1 = self.testKit(first).spawnTestProbe(expecting: String.self)
Expand All @@ -222,28 +216,37 @@ final class ActorSingletonPluginTests: ClusteredNodesTestBase {
// Take down the leader
first.cluster.down(node: first.cluster.node.node)

// Make sure that `second` and `third` see `first` as down and become leader-less
try self.testKit(second).eventually(within: .seconds(10)) {
try self.assertMemberStatus(on: second, node: first.cluster.node, is: .down)
try self.assertLeaderNode(on: second, is: nil)
}
// Ensure the node is seen down
try self.ensureNodes(.down, on: second, systems: first)
try self.ensureNodes(.down, on: third, systems: first)

// At the same time, since the node was downed, it is not the leader anymore
try self.testKit(third).eventually(within: .seconds(10)) {
try self.assertMemberStatus(on: third, node: first.cluster.node, is: .down)
try self.assertLeaderNode(on: third, is: nil)
try self.assertLeaderNode(on: second, is: nil)
}

// No leader so singleton is not available, messages sent should be stashed
ref2.tell(.greet(name: "Charlie-2", _replyTo: replyProbe2.ref))
ref3.tell(.greet(name: "Charlie-3", _replyTo: replyProbe3.ref))

// `fourth` will become the new leader and singleton
fourth.cluster.join(node: second.cluster.node.node)

try self.joinNodes(node: fourth, with: second)
try self.ensureNodes(.up, within: .seconds(10), systems: fourth, second, third)

// The stashed messages get routed to new singleton running on `fourth`
try replyProbe2.expectMessage("Hello-4 Charlie-2!")
try replyProbe3.expectMessage("Hello-4 Charlie-3!")
try self.testKit(first).eventually(within: .seconds(10)) {
// The stashed messages get routed to new singleton running on `fourth`

ref2.tell(.greet(name: "Charlie-2", _replyTo: replyProbe2.ref))
guard let reply2 = try replyProbe2.maybeExpectMessage() else {
pprint("lost msg @ node 2")
throw TestError("No reply to \(replyProbe2) yet, singleton still rebalancing...?")
}
reply2.shouldEqual("Hello-4 Charlie-2!")

ref3.tell(.greet(name: "Charlie-3", _replyTo: replyProbe3.ref))
guard let reply3 = try replyProbe3.maybeExpectMessage() else {
pprint("lost msg @ node 3")
throw TestError("No reply to \(replyProbe3) yet, singleton still rebalancing...?")
}
reply3.shouldEqual("Hello-4 Charlie-3!")
}
}
}
}
Expand Down