Skip to content

Commit f7ae61b

Browse files
committed
Create ClusterControl in ActorSystem directly #74
1 parent c1fa219 commit f7ae61b

File tree

8 files changed

+53
-44
lines changed

8 files changed

+53
-44
lines changed

Sources/DistributedActors/ActorSystem.swift

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public final class ActorSystem {
9191

9292
// initialized during startup
9393
internal var _cluster: ClusterShell?
94-
internal var _clusterEvents: EventStream<ClusterEvent>?
94+
internal var _clusterControl: ClusterControl?
9595
internal var _nodeDeathWatcher: NodeDeathWatcherShell.Ref?
9696

9797
// ==== ----------------------------------------------------------------------------------------------------------------
@@ -201,9 +201,9 @@ public final class ActorSystem {
201201
do {
202202
// Cluster MUST be the last thing we initialize, since once we're bound, we may receive incoming messages from other nodes
203203
if let cluster = self._cluster {
204-
let clusterEvents = try! EventStream<ClusterEvent>(self, name: "clusterEvents")
205-
self._clusterEvents = clusterEvents
206-
_ = try cluster.start(system: self, eventStream: self.clusterEvents) // only spawns when cluster is initialized
204+
let clusterEvents = try! EventStream<ClusterEvent>(self, name: "clusterEvents", systemStream: true)
205+
self._clusterControl = ClusterControl(settings.cluster, shell: cluster, eventStream: clusterEvents)
206+
_ = try cluster.start(system: self, eventStream: clusterEvents) // only spawns when cluster is initialized
207207

208208
// Node watcher MUST be started AFTER cluster and clusterEvents
209209
self._nodeDeathWatcher = try self._spawnSystemActor(
@@ -237,9 +237,9 @@ public final class ActorSystem {
237237
/// Do not call from within actors or you may deadlock shutting down the system.
238238
public func shutdown() {
239239
self.log.log(level: .debug, "SHUTTING DOWN ACTOR SYSTEM [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
240-
if self.settings.cluster.enabled {
240+
if let cluster = self._cluster {
241241
let receptacle = BlockingReceptacle<Void>()
242-
self.cluster._shell.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown
242+
cluster.ref.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown
243243
receptacle.wait(atMost: .milliseconds(300)) // FIXME: configure
244244
}
245245
self.userProvider.stopAll()
@@ -250,6 +250,18 @@ public final class ActorSystem {
250250
self._cluster = nil
251251
self._receptionist = self.deadLetters.adapted()
252252
}
253+
254+
public var cluster: ClusterControl {
255+
guard self.settings.cluster.enabled else {
256+
fatalError("Tried to access cluster control, but clustering is not enabled.")
257+
}
258+
259+
guard let clusterControl = self._clusterControl else {
260+
fatalError("BUG! Tried to access clusterControl on \(self) and it was nil! Please report this on the issue tracker.")
261+
}
262+
263+
return clusterControl
264+
}
253265
}
254266

255267
extension ActorSystem: Equatable {

Sources/DistributedActors/Cluster/ActorSystem+Cluster.swift renamed to Sources/DistributedActors/Cluster/ClusterControl.swift

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public struct ClusterControl {
2525

2626
public let settings: ClusterSettings
2727

28-
internal let _shell: ClusterShell.Ref
28+
internal let _shell: ClusterShell
2929

30-
init(_ settings: ClusterSettings, shell: ClusterShell.Ref, eventStream: EventStream<ClusterEvent>) {
30+
init(_ settings: ClusterSettings, shell: ClusterShell, eventStream: EventStream<ClusterEvent>) {
3131
self.settings = settings
3232
self._shell = shell
3333
self.events = eventStream
@@ -38,29 +38,18 @@ public struct ClusterControl {
3838
}
3939

4040
public func join(node: Node) {
41-
self._shell.tell(.command(.join(node)))
41+
self._shell.ref.tell(.command(.join(node)))
4242
}
4343

4444
public func down(node: Node) {
45-
self._shell.tell(.command(.downCommand(node)))
45+
self._shell.ref.tell(.command(.downCommand(node)))
4646
}
4747

4848
public func down(node: UniqueNode) {
49-
self._shell.tell(.command(.downCommand(node.node)))
49+
self._shell.ref.tell(.command(.downCommand(node.node)))
5050
}
5151

5252
public var node: UniqueNode {
5353
return self.settings.uniqueBindNode
5454
}
5555
}
56-
57-
extension ActorSystem {
58-
public var cluster: ClusterControl {
59-
let shell = self._cluster?.ref ?? self.deadLetters.adapted()
60-
return .init(self.settings.cluster, shell: shell, eventStream: self.clusterEvents)
61-
}
62-
63-
internal var clusterEvents: EventStream<ClusterEvent> {
64-
return self._clusterEvents ?? EventStream(ref: self.deadLetters.adapted())
65-
}
66-
}

Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ internal struct SWIMShell {
390390

391391
let handshakeTimeout = TimeAmount.seconds(3)
392392
// FIXME: use reasonable timeout and back off? issue #724
393-
let handshakeResultAnswer = context.system.cluster._shell.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) {
393+
let handshakeResultAnswer = context.system.cluster._shell.ref.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) {
394394
.command(.handshakeWith(remoteNode, replyTo: $0))
395395
}
396396
context.onResultAsync(of: handshakeResultAnswer, timeout: .effectivelyInfinite) { handshakeResultResult in

Sources/DistributedActors/EventStream.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,21 @@ public struct EventStream<Event> {
2222
internal let ref: ActorRef<EventStreamShell.Message<Event>>
2323

2424
public init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self) throws {
25-
self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type))
25+
try self.init(system, name: name, of: type, systemStream: false)
2626
}
2727

2828
internal init(ref: ActorRef<EventStreamShell.Message<Event>>) {
2929
self.ref = ref
3030
}
3131

32+
internal init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self, systemStream: Bool) throws {
33+
if systemStream {
34+
self.ref = try system._spawnSystemActor(.unique(name), EventStreamShell.behavior(type))
35+
} else {
36+
self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type))
37+
}
38+
}
39+
3240
public func subscribe(_ ref: ActorRef<Event>) {
3341
self.ref.tell(.subscribe(ref))
3442
}

Sources/DistributedActors/ProcessIsolated/ProcessIsolated.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public class ProcessIsolated {
111111
let funKillServantProcess: (Int) -> Void = { (pid: Int) in
112112
self.lock.withLockVoid {
113113
if let servant = self._servants[pid] {
114-
self.system.cluster._shell.tell(.command(.downCommand(servant.node.node)))
114+
self.system.cluster.down(node: servant.node.node)
115115
self._servants.removeValue(forKey: pid)
116116
}
117117
}

Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
3434
let (local, remote) = self.setUpPair()
3535
let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)
3636

37-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
37+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
3838

3939
try assertAssociated(local, withExactly: remote.cluster.node)
4040
try assertAssociated(remote, withExactly: local.cluster.node)
@@ -46,14 +46,14 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
4646
let (local, remote) = self.setUpPair()
4747
let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)
4848

49-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
49+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
5050

5151
try assertAssociated(local, withExactly: remote.cluster.node)
5252
try assertAssociated(remote, withExactly: local.cluster.node)
5353

5454
try p.expectMessage(.success(remote.cluster.node))
5555

56-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
56+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
5757

5858
try p.expectMessage(.success(remote.cluster.node))
5959
}
@@ -128,8 +128,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
128128

129129
// here we attempt to make a race where the nodes race to join each other
130130
// again, only one association should be created.
131-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
132-
remote.cluster._shell.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref)))
131+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
132+
remote.cluster._shell.ref.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref)))
133133

134134
_ = try p7337.expectMessage()
135135
_ = try p8228.expectMessage()
@@ -145,8 +145,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
145145
let p8228 = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)
146146

147147
// we issue two handshakes quickly after each other, both should succeed but there should only be one association established (!)
148-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
149-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref)))
148+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
149+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref)))
150150

151151
_ = try p7337.expectMessage()
152152
_ = try p8228.expectMessage()
@@ -196,7 +196,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
196196

197197
let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)
198198

199-
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
199+
local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
200200

201201
try assertNotAssociated(system: local, expectAssociatedNode: remote.cluster.node)
202202
try assertNotAssociated(system: remote, expectAssociatedNode: local.cluster.node)
@@ -243,7 +243,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
243243
var node = local.cluster.node.node
244244
node.port = node.port + 10
245245

246-
local.cluster._shell.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API
246+
local.cluster._shell.ref.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API
247247

248248
switch try p.expectMessage(within: .seconds(1)) {
249249
case ClusterShell.HandshakeResult.failure:
@@ -267,7 +267,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
267267

268268
// we we down local on local, it should become down there:
269269
try self.testKit(local).eventually(within: .seconds(3)) {
270-
local.cluster._shell.tell(.query(.currentMembership(localProbe.ref)))
270+
local.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref)))
271271
let localMembership = try localProbe.expectMessage()
272272

273273
guard let selfMember = localMembership.member(local.cluster.node.node) else {
@@ -281,7 +281,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
281281
// although this may be a best effort since the local can just shut down if it wanted to,
282282
// this scenario assumes a graceful leave though:
283283

284-
remote.cluster._shell.tell(.query(.currentMembership(localProbe.ref)))
284+
remote.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref)))
285285
let remoteMembership = try localProbe.expectMessage()
286286

287287
guard let localMemberObservedOnRemote = remoteMembership.member(local.cluster.node.node) else {

Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ extension ClusteredNodesTestBase {
101101

102102
var infos: [String] = []
103103
for node in self._nodes {
104-
node.cluster._shell.tell(.query(.currentMembership(p.ref)))
104+
node.cluster._shell.ref.tell(.query(.currentMembership(p.ref)))
105105
let membership = try! p.expectMessage()
106106
infos.append(membership.prettyDescription(label: "\(node.cluster.node)"))
107107
}
@@ -169,7 +169,7 @@ extension ClusteredNodesTestBase {
169169
defer { probe.stop() }
170170

171171
try testKit.eventually(within: timeout ?? .seconds(5), file: file, line: line, column: column) {
172-
system.cluster._shell.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here
172+
system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here
173173
let associatedNodes = try probe.expectMessage(file: file, line: line)
174174

175175
if verbose {
@@ -208,7 +208,7 @@ extension ClusteredNodesTestBase {
208208
let probe = testKit.spawnTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set<UniqueNode>.self)
209209
defer { probe.stop() }
210210
try testKit.assertHolds(for: timeout ?? .seconds(1)) {
211-
system.cluster._shell.tell(.query(.associatedNodes(probe.ref)))
211+
system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref)))
212212
let associatedNodes = try probe.expectMessage() // TODO: use interval here
213213
if verbose {
214214
pprint(" Self: \(String(reflecting: system.settings.cluster.uniqueBindNode))")
@@ -228,7 +228,7 @@ extension ClusteredNodesTestBase {
228228
func assertMemberStatus(_ testKit: ActorTestKit, on system: ActorSystem, member memberSystem: ActorSystem, is expectedStatus: MemberStatus,
229229
file: StaticString = #file, line: UInt = #line) throws {
230230
let p = testKit.spawnTestProbe(expecting: Membership.self)
231-
system.cluster._shell.tell(.query(.currentMembership(p.ref)))
231+
system.cluster._shell.ref.tell(.query(.currentMembership(p.ref)))
232232

233233
let membership = try p.expectMessage()
234234
guard let foundMember = membership.member(memberSystem.cluster.node) else {

Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,16 @@ class RemotingTLSTests: ClusteredNodesTestBase {
245245

246246
do {
247247
let pSystem = testKit.spawnTestProbe(expecting: Set<UniqueNode>.self)
248-
local.cluster._shell.tell(.query(.associatedNodes(pSystem.ref)))
249-
remote.cluster._shell.tell(.query(.associatedNodes(pSystem.ref)))
248+
local.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref)))
249+
remote.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref)))
250250
let associatedNodes = try pSystem.expectMessage()
251251
associatedNodes.shouldBeEmpty() // means we have not associated to _someone_
252252
}
253253

254254
do {
255255
let pRemote = testKit.spawnTestProbe(expecting: Set<UniqueNode>.self)
256-
local.cluster._shell.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side
257-
remote.cluster._shell.tell(.query(.associatedNodes(pRemote.ref)))
256+
local.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side
257+
remote.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref)))
258258
let associatedNodes = try pRemote.expectMessage()
259259
associatedNodes.shouldBeEmpty() // means we have not associated to _someone_
260260
}

0 commit comments

Comments
 (0)