From 4bc65a0c9f6f2945c0506f425c98ff87b84445c2 Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Tue, 3 Sep 2019 14:43:03 -0700 Subject: [PATCH 1/2] Create ClusterControl in ActorSystem directly #74 --- Sources/DistributedActors/ActorSystem.swift | 24 ++++++++++++++----- ...tem+Cluster.swift => ClusterControl.swift} | 21 ++++------------ .../Cluster/SWIM/SWIMShell.swift | 2 +- Sources/DistributedActors/EventStream.swift | 10 +++++++- .../ProcessIsolated/ProcessIsolated.swift | 2 +- .../Cluster/AssociationClusteredTests.swift | 22 ++++++++--------- .../Cluster/ClusteredNodesTestBase.swift | 8 +++---- .../Cluster/RemotingTLSClusteredTests.swift | 8 +++---- 8 files changed, 53 insertions(+), 44 deletions(-) rename Sources/DistributedActors/Cluster/{ActorSystem+Cluster.swift => ClusterControl.swift} (67%) diff --git a/Sources/DistributedActors/ActorSystem.swift b/Sources/DistributedActors/ActorSystem.swift index f5278cecb..b670199e3 100644 --- a/Sources/DistributedActors/ActorSystem.swift +++ b/Sources/DistributedActors/ActorSystem.swift @@ -91,7 +91,7 @@ public final class ActorSystem { // initialized during startup internal var _cluster: ClusterShell? - internal var _clusterEvents: EventStream? + internal var _clusterControl: ClusterControl? internal var _nodeDeathWatcher: NodeDeathWatcherShell.Ref? // ==== ---------------------------------------------------------------------------------------------------------------- @@ -201,9 +201,9 @@ public final class ActorSystem { do { // Cluster MUST be the last thing we initialize, since once we're bound, we may receive incoming messages from other nodes if let cluster = self._cluster { - let clusterEvents = try! EventStream(self, name: "clusterEvents") - self._clusterEvents = clusterEvents - _ = try cluster.start(system: self, eventStream: self.clusterEvents) // only spawns when cluster is initialized + let clusterEvents = try! EventStream(self, name: "clusterEvents", systemStream: true) + self._clusterControl = ClusterControl(settings.cluster, shell: cluster, eventStream: clusterEvents) + _ = try cluster.start(system: self, eventStream: clusterEvents) // only spawns when cluster is initialized // Node watcher MUST be started AFTER cluster and clusterEvents self._nodeDeathWatcher = try self._spawnSystemActor( @@ -237,9 +237,9 @@ public final class ActorSystem { /// Do not call from within actors or you may deadlock shutting down the system. public func shutdown() { self.log.log(level: .debug, "SHUTTING DOWN ACTOR SYSTEM [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line) - if self.settings.cluster.enabled { + if let cluster = self._cluster { let receptacle = BlockingReceptacle() - self.cluster._shell.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown + cluster.ref.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown receptacle.wait(atMost: .milliseconds(300)) // FIXME: configure } self.userProvider.stopAll() @@ -250,6 +250,18 @@ public final class ActorSystem { self._cluster = nil self._receptionist = self.deadLetters.adapted() } + + public var cluster: ClusterControl { + guard self.settings.cluster.enabled else { + fatalError("Tried to access cluster control, but clustering is not enabled.") + } + + guard let clusterControl = self._clusterControl else { + fatalError("BUG! Tried to access clusterControl on \(self) and it was nil! Please report this on the issue tracker.") + } + + return clusterControl + } } extension ActorSystem: Equatable { diff --git a/Sources/DistributedActors/Cluster/ActorSystem+Cluster.swift b/Sources/DistributedActors/Cluster/ClusterControl.swift similarity index 67% rename from Sources/DistributedActors/Cluster/ActorSystem+Cluster.swift rename to Sources/DistributedActors/Cluster/ClusterControl.swift index f249681d2..6ad5e9938 100644 --- a/Sources/DistributedActors/Cluster/ActorSystem+Cluster.swift +++ b/Sources/DistributedActors/Cluster/ClusterControl.swift @@ -25,9 +25,9 @@ public struct ClusterControl { public let settings: ClusterSettings - internal let _shell: ClusterShell.Ref + internal let _shell: ClusterShell - init(_ settings: ClusterSettings, shell: ClusterShell.Ref, eventStream: EventStream) { + init(_ settings: ClusterSettings, shell: ClusterShell, eventStream: EventStream) { self.settings = settings self._shell = shell self.events = eventStream @@ -38,29 +38,18 @@ public struct ClusterControl { } public func join(node: Node) { - self._shell.tell(.command(.join(node))) + self._shell.ref.tell(.command(.join(node))) } public func down(node: Node) { - self._shell.tell(.command(.downCommand(node))) + self._shell.ref.tell(.command(.downCommand(node))) } public func down(node: UniqueNode) { - self._shell.tell(.command(.downCommand(node.node))) + self._shell.ref.tell(.command(.downCommand(node.node))) } public var node: UniqueNode { return self.settings.uniqueBindNode } } - -extension ActorSystem { - public var cluster: ClusterControl { - let shell = self._cluster?.ref ?? self.deadLetters.adapted() - return .init(self.settings.cluster, shell: shell, eventStream: self.clusterEvents) - } - - internal var clusterEvents: EventStream { - return self._clusterEvents ?? EventStream(ref: self.deadLetters.adapted()) - } -} diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift index 183d873f6..25a2bd83e 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift @@ -390,7 +390,7 @@ internal struct SWIMShell { let handshakeTimeout = TimeAmount.seconds(3) // FIXME: use reasonable timeout and back off? issue #724 - let handshakeResultAnswer = context.system.cluster._shell.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) { + let handshakeResultAnswer = context.system.cluster._shell.ref.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) { .command(.handshakeWith(remoteNode, replyTo: $0)) } context.onResultAsync(of: handshakeResultAnswer, timeout: .effectivelyInfinite) { handshakeResultResult in diff --git a/Sources/DistributedActors/EventStream.swift b/Sources/DistributedActors/EventStream.swift index 5123d41f2..650239565 100644 --- a/Sources/DistributedActors/EventStream.swift +++ b/Sources/DistributedActors/EventStream.swift @@ -22,13 +22,21 @@ public struct EventStream { internal let ref: ActorRef> public init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self) throws { - self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type)) + try self.init(system, name: name, of: type, systemStream: false) } internal init(ref: ActorRef>) { self.ref = ref } + internal init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self, systemStream: Bool) throws { + if systemStream { + self.ref = try system._spawnSystemActor(.unique(name), EventStreamShell.behavior(type)) + } else { + self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type)) + } + } + public func subscribe(_ ref: ActorRef) { self.ref.tell(.subscribe(ref)) } diff --git a/Sources/DistributedActors/ProcessIsolated/ProcessIsolated.swift b/Sources/DistributedActors/ProcessIsolated/ProcessIsolated.swift index 7c4a2c20d..bfac9bdd9 100644 --- a/Sources/DistributedActors/ProcessIsolated/ProcessIsolated.swift +++ b/Sources/DistributedActors/ProcessIsolated/ProcessIsolated.swift @@ -111,7 +111,7 @@ public class ProcessIsolated { let funKillServantProcess: (Int) -> Void = { (pid: Int) in self.lock.withLockVoid { if let servant = self._servants[pid] { - self.system.cluster._shell.tell(.command(.downCommand(servant.node.node))) + self.system.cluster.down(node: servant.node.node) self._servants.removeValue(forKey: pid) } } diff --git a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift index 966051bd9..667a3f475 100644 --- a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift @@ -34,7 +34,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let (local, remote) = self.setUpPair() let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertAssociated(local, withExactly: remote.cluster.node) try assertAssociated(remote, withExactly: local.cluster.node) @@ -46,14 +46,14 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let (local, remote) = self.setUpPair() let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertAssociated(local, withExactly: remote.cluster.node) try assertAssociated(remote, withExactly: local.cluster.node) try p.expectMessage(.success(remote.cluster.node)) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try p.expectMessage(.success(remote.cluster.node)) } @@ -128,8 +128,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // here we attempt to make a race where the nodes race to join each other // again, only one association should be created. - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) - remote.cluster._shell.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) + remote.cluster._shell.ref.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref))) _ = try p7337.expectMessage() _ = try p8228.expectMessage() @@ -145,8 +145,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let p8228 = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) // we issue two handshakes quickly after each other, both should succeed but there should only be one association established (!) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref))) _ = try p7337.expectMessage() _ = try p8228.expectMessage() @@ -196,7 +196,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertNotAssociated(system: local, expectAssociatedNode: remote.cluster.node) try assertNotAssociated(system: remote, expectAssociatedNode: local.cluster.node) @@ -243,7 +243,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { var node = local.cluster.node.node node.port = node.port + 10 - local.cluster._shell.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API + local.cluster._shell.ref.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API switch try p.expectMessage(within: .seconds(1)) { case ClusterShell.HandshakeResult.failure: @@ -267,7 +267,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // we we down local on local, it should become down there: try self.testKit(local).eventually(within: .seconds(3)) { - local.cluster._shell.tell(.query(.currentMembership(localProbe.ref))) + local.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref))) let localMembership = try localProbe.expectMessage() guard let selfMember = localMembership.member(local.cluster.node.node) else { @@ -281,7 +281,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // although this may be a best effort since the local can just shut down if it wanted to, // this scenario assumes a graceful leave though: - remote.cluster._shell.tell(.query(.currentMembership(localProbe.ref))) + remote.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref))) let remoteMembership = try localProbe.expectMessage() guard let localMemberObservedOnRemote = remoteMembership.member(local.cluster.node.node) else { diff --git a/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift b/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift index 47e9576d8..67de26d01 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift @@ -101,7 +101,7 @@ extension ClusteredNodesTestBase { var infos: [String] = [] for node in self._nodes { - node.cluster._shell.tell(.query(.currentMembership(p.ref))) + node.cluster._shell.ref.tell(.query(.currentMembership(p.ref))) let membership = try! p.expectMessage() infos.append(membership.prettyDescription(label: "\(node.cluster.node)")) } @@ -169,7 +169,7 @@ extension ClusteredNodesTestBase { defer { probe.stop() } try testKit.eventually(within: timeout ?? .seconds(5), file: file, line: line, column: column) { - system.cluster._shell.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here + system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here let associatedNodes = try probe.expectMessage(file: file, line: line) if verbose { @@ -208,7 +208,7 @@ extension ClusteredNodesTestBase { let probe = testKit.spawnTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set.self) defer { probe.stop() } try testKit.assertHolds(for: timeout ?? .seconds(1)) { - system.cluster._shell.tell(.query(.associatedNodes(probe.ref))) + system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref))) let associatedNodes = try probe.expectMessage() // TODO: use interval here if verbose { pprint(" Self: \(String(reflecting: system.settings.cluster.uniqueBindNode))") @@ -228,7 +228,7 @@ extension ClusteredNodesTestBase { func assertMemberStatus(_ testKit: ActorTestKit, on system: ActorSystem, member memberSystem: ActorSystem, is expectedStatus: MemberStatus, file: StaticString = #file, line: UInt = #line) throws { let p = testKit.spawnTestProbe(expecting: Membership.self) - system.cluster._shell.tell(.query(.currentMembership(p.ref))) + system.cluster._shell.ref.tell(.query(.currentMembership(p.ref))) let membership = try p.expectMessage() guard let foundMember = membership.member(memberSystem.cluster.node) else { diff --git a/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift index 7be9d3b7f..eadc3dc65 100644 --- a/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift @@ -245,16 +245,16 @@ class RemotingTLSTests: ClusteredNodesTestBase { do { let pSystem = testKit.spawnTestProbe(expecting: Set.self) - local.cluster._shell.tell(.query(.associatedNodes(pSystem.ref))) - remote.cluster._shell.tell(.query(.associatedNodes(pSystem.ref))) + local.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref))) + remote.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref))) let associatedNodes = try pSystem.expectMessage() associatedNodes.shouldBeEmpty() // means we have not associated to _someone_ } do { let pRemote = testKit.spawnTestProbe(expecting: Set.self) - local.cluster._shell.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side - remote.cluster._shell.tell(.query(.associatedNodes(pRemote.ref))) + local.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side + remote.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref))) let associatedNodes = try pRemote.expectMessage() associatedNodes.shouldBeEmpty() // means we have not associated to _someone_ } From fafee5d24edd8682a002b4b2f051eeed22cea659 Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Wed, 11 Sep 2019 15:42:39 -0700 Subject: [PATCH 2/2] Address review feedback --- Sources/DistributedActors/ActorSystem.swift | 8 +++---- .../Cluster/ClusterControl.swift | 12 +++++----- .../Cluster/SWIM/SWIMShell.swift | 2 +- .../Cluster/AssociationClusteredTests.swift | 22 +++++++++---------- .../Cluster/ClusteredNodesTestBase.swift | 8 +++---- .../Cluster/RemotingTLSClusteredTests.swift | 8 +++---- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/Sources/DistributedActors/ActorSystem.swift b/Sources/DistributedActors/ActorSystem.swift index b670199e3..7beb79d9b 100644 --- a/Sources/DistributedActors/ActorSystem.swift +++ b/Sources/DistributedActors/ActorSystem.swift @@ -176,6 +176,7 @@ public final class ActorSystem { effectiveSystemProvider = RemoteActorRefProvider(settings: settings, cluster: cluster, localProvider: localSystemProvider) } else { self._cluster = nil + self._clusterControl = ClusterControl(self.settings.cluster, clusterRef: self.deadLetters.adapted(), eventStream: EventStream(ref: self.deadLetters.adapted())) } self.systemProvider = effectiveSystemProvider @@ -202,9 +203,10 @@ public final class ActorSystem { // Cluster MUST be the last thing we initialize, since once we're bound, we may receive incoming messages from other nodes if let cluster = self._cluster { let clusterEvents = try! EventStream(self, name: "clusterEvents", systemStream: true) - self._clusterControl = ClusterControl(settings.cluster, shell: cluster, eventStream: clusterEvents) _ = try cluster.start(system: self, eventStream: clusterEvents) // only spawns when cluster is initialized + self._clusterControl = ClusterControl(settings.cluster, clusterRef: cluster.ref, eventStream: clusterEvents) + // Node watcher MUST be started AFTER cluster and clusterEvents self._nodeDeathWatcher = try self._spawnSystemActor( NodeDeathWatcherShell.naming, @@ -252,10 +254,6 @@ public final class ActorSystem { } public var cluster: ClusterControl { - guard self.settings.cluster.enabled else { - fatalError("Tried to access cluster control, but clustering is not enabled.") - } - guard let clusterControl = self._clusterControl else { fatalError("BUG! Tried to access clusterControl on \(self) and it was nil! Please report this on the issue tracker.") } diff --git a/Sources/DistributedActors/Cluster/ClusterControl.swift b/Sources/DistributedActors/Cluster/ClusterControl.swift index 6ad5e9938..47abbf22a 100644 --- a/Sources/DistributedActors/Cluster/ClusterControl.swift +++ b/Sources/DistributedActors/Cluster/ClusterControl.swift @@ -25,11 +25,11 @@ public struct ClusterControl { public let settings: ClusterSettings - internal let _shell: ClusterShell + internal let _clusterRef: ClusterShell.Ref - init(_ settings: ClusterSettings, shell: ClusterShell, eventStream: EventStream) { + init(_ settings: ClusterSettings, clusterRef: ClusterShell.Ref, eventStream: EventStream) { self.settings = settings - self._shell = shell + self._clusterRef = clusterRef self.events = eventStream } @@ -38,15 +38,15 @@ public struct ClusterControl { } public func join(node: Node) { - self._shell.ref.tell(.command(.join(node))) + self._clusterRef.tell(.command(.join(node))) } public func down(node: Node) { - self._shell.ref.tell(.command(.downCommand(node))) + self._clusterRef.tell(.command(.downCommand(node))) } public func down(node: UniqueNode) { - self._shell.ref.tell(.command(.downCommand(node.node))) + self._clusterRef.tell(.command(.downCommand(node.node))) } public var node: UniqueNode { diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift index 25a2bd83e..0cb335df3 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift @@ -390,7 +390,7 @@ internal struct SWIMShell { let handshakeTimeout = TimeAmount.seconds(3) // FIXME: use reasonable timeout and back off? issue #724 - let handshakeResultAnswer = context.system.cluster._shell.ref.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) { + let handshakeResultAnswer = context.system.cluster._clusterRef.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) { .command(.handshakeWith(remoteNode, replyTo: $0)) } context.onResultAsync(of: handshakeResultAnswer, timeout: .effectivelyInfinite) { handshakeResultResult in diff --git a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift index 667a3f475..546b22892 100644 --- a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift @@ -34,7 +34,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let (local, remote) = self.setUpPair() let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertAssociated(local, withExactly: remote.cluster.node) try assertAssociated(remote, withExactly: local.cluster.node) @@ -46,14 +46,14 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let (local, remote) = self.setUpPair() let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertAssociated(local, withExactly: remote.cluster.node) try assertAssociated(remote, withExactly: local.cluster.node) try p.expectMessage(.success(remote.cluster.node)) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try p.expectMessage(.success(remote.cluster.node)) } @@ -128,8 +128,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // here we attempt to make a race where the nodes race to join each other // again, only one association should be created. - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) - remote.cluster._shell.ref.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) + remote.cluster._clusterRef.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref))) _ = try p7337.expectMessage() _ = try p8228.expectMessage() @@ -145,8 +145,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let p8228 = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) // we issue two handshakes quickly after each other, both should succeed but there should only be one association established (!) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref))) _ = try p7337.expectMessage() _ = try p8228.expectMessage() @@ -196,7 +196,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self) - local.cluster._shell.ref.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) + local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref))) try assertNotAssociated(system: local, expectAssociatedNode: remote.cluster.node) try assertNotAssociated(system: remote, expectAssociatedNode: local.cluster.node) @@ -243,7 +243,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { var node = local.cluster.node.node node.port = node.port + 10 - local.cluster._shell.ref.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API + local.cluster._clusterRef.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API switch try p.expectMessage(within: .seconds(1)) { case ClusterShell.HandshakeResult.failure: @@ -267,7 +267,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // we we down local on local, it should become down there: try self.testKit(local).eventually(within: .seconds(3)) { - local.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref))) + local.cluster._clusterRef.tell(.query(.currentMembership(localProbe.ref))) let localMembership = try localProbe.expectMessage() guard let selfMember = localMembership.member(local.cluster.node.node) else { @@ -281,7 +281,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // although this may be a best effort since the local can just shut down if it wanted to, // this scenario assumes a graceful leave though: - remote.cluster._shell.ref.tell(.query(.currentMembership(localProbe.ref))) + remote.cluster._clusterRef.tell(.query(.currentMembership(localProbe.ref))) let remoteMembership = try localProbe.expectMessage() guard let localMemberObservedOnRemote = remoteMembership.member(local.cluster.node.node) else { diff --git a/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift b/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift index 67de26d01..373591346 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusteredNodesTestBase.swift @@ -101,7 +101,7 @@ extension ClusteredNodesTestBase { var infos: [String] = [] for node in self._nodes { - node.cluster._shell.ref.tell(.query(.currentMembership(p.ref))) + node.cluster._clusterRef.tell(.query(.currentMembership(p.ref))) let membership = try! p.expectMessage() infos.append(membership.prettyDescription(label: "\(node.cluster.node)")) } @@ -169,7 +169,7 @@ extension ClusteredNodesTestBase { defer { probe.stop() } try testKit.eventually(within: timeout ?? .seconds(5), file: file, line: line, column: column) { - system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here + system.cluster._clusterRef.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here let associatedNodes = try probe.expectMessage(file: file, line: line) if verbose { @@ -208,7 +208,7 @@ extension ClusteredNodesTestBase { let probe = testKit.spawnTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set.self) defer { probe.stop() } try testKit.assertHolds(for: timeout ?? .seconds(1)) { - system.cluster._shell.ref.tell(.query(.associatedNodes(probe.ref))) + system.cluster._clusterRef.tell(.query(.associatedNodes(probe.ref))) let associatedNodes = try probe.expectMessage() // TODO: use interval here if verbose { pprint(" Self: \(String(reflecting: system.settings.cluster.uniqueBindNode))") @@ -228,7 +228,7 @@ extension ClusteredNodesTestBase { func assertMemberStatus(_ testKit: ActorTestKit, on system: ActorSystem, member memberSystem: ActorSystem, is expectedStatus: MemberStatus, file: StaticString = #file, line: UInt = #line) throws { let p = testKit.spawnTestProbe(expecting: Membership.self) - system.cluster._shell.ref.tell(.query(.currentMembership(p.ref))) + system.cluster._clusterRef.tell(.query(.currentMembership(p.ref))) let membership = try p.expectMessage() guard let foundMember = membership.member(memberSystem.cluster.node) else { diff --git a/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift index eadc3dc65..145c0c4c8 100644 --- a/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/RemotingTLSClusteredTests.swift @@ -245,16 +245,16 @@ class RemotingTLSTests: ClusteredNodesTestBase { do { let pSystem = testKit.spawnTestProbe(expecting: Set.self) - local.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref))) - remote.cluster._shell.ref.tell(.query(.associatedNodes(pSystem.ref))) + local.cluster._clusterRef.tell(.query(.associatedNodes(pSystem.ref))) + remote.cluster._clusterRef.tell(.query(.associatedNodes(pSystem.ref))) let associatedNodes = try pSystem.expectMessage() associatedNodes.shouldBeEmpty() // means we have not associated to _someone_ } do { let pRemote = testKit.spawnTestProbe(expecting: Set.self) - local.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side - remote.cluster._shell.ref.tell(.query(.associatedNodes(pRemote.ref))) + local.cluster._clusterRef.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side + remote.cluster._clusterRef.tell(.query(.associatedNodes(pRemote.ref))) let associatedNodes = try pRemote.expectMessage() associatedNodes.shouldBeEmpty() // means we have not associated to _someone_ }