Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public final class ActorSystem {

// initialized during startup
internal var _cluster: ClusterShell?
internal var _clusterEvents: EventStream<ClusterEvent>?
internal var _clusterControl: ClusterControl?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice; I wonder if we need to store the _cluster as well though?
All usage I guess would rather be though _control? we can always _clusterControl._shell?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Thinking to have as little fields for the same thing as we can for the core actor system; we'd have exactly one way to access the shell then etc)

internal var _nodeDeathWatcher: NodeDeathWatcherShell.Ref?

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -176,6 +176,7 @@ public final class ActorSystem {
effectiveSystemProvider = RemoteActorRefProvider(settings: settings, cluster: cluster, localProvider: localSystemProvider)
} else {
self._cluster = nil
self._clusterControl = ClusterControl(self.settings.cluster, clusterRef: self.deadLetters.adapted(), eventStream: EventStream(ref: self.deadLetters.adapted()))
}

self.systemProvider = effectiveSystemProvider
Expand All @@ -201,9 +202,10 @@ public final class ActorSystem {
do {
// Cluster MUST be the last thing we initialize, since once we're bound, we may receive incoming messages from other nodes
if let cluster = self._cluster {
let clusterEvents = try! EventStream<ClusterEvent>(self, name: "clusterEvents")
self._clusterEvents = clusterEvents
_ = try cluster.start(system: self, eventStream: self.clusterEvents) // only spawns when cluster is initialized
let clusterEvents = try! EventStream<ClusterEvent>(self, name: "clusterEvents", systemStream: true)
_ = try cluster.start(system: self, eventStream: clusterEvents) // only spawns when cluster is initialized

self._clusterControl = ClusterControl(settings.cluster, clusterRef: cluster.ref, eventStream: clusterEvents)

// Node watcher MUST be started AFTER cluster and clusterEvents
self._nodeDeathWatcher = try self._spawnSystemActor(
Expand Down Expand Up @@ -237,9 +239,9 @@ public final class ActorSystem {
/// Do not call from within actors or you may deadlock shutting down the system.
public func shutdown() {
self.log.log(level: .debug, "SHUTTING DOWN ACTOR SYSTEM [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
if self.settings.cluster.enabled {
if let cluster = self._cluster {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to move from the settings check to this style 👍

let receptacle = BlockingReceptacle<Void>()
self.cluster._shell.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown
cluster.ref.tell(.command(.unbind(receptacle))) // FIXME: should be shutdown
receptacle.wait(atMost: .milliseconds(300)) // FIXME: configure
}
self.userProvider.stopAll()
Expand All @@ -250,6 +252,14 @@ public final class ActorSystem {
self._cluster = nil
self._receptionist = self.deadLetters.adapted()
}

public var cluster: ClusterControl {
guard let clusterControl = self._clusterControl else {
fatalError("BUG! Tried to access clusterControl on \(self) and it was nil! Please report this on the issue tracker.")
}

return clusterControl
}
}

extension ActorSystem: Equatable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ public struct ClusterControl {

public let settings: ClusterSettings

internal let _shell: ClusterShell.Ref
internal let _clusterRef: ClusterShell.Ref

init(_ settings: ClusterSettings, shell: ClusterShell.Ref, eventStream: EventStream<ClusterEvent>) {
init(_ settings: ClusterSettings, clusterRef: ClusterShell.Ref, eventStream: EventStream<ClusterEvent>) {
self.settings = settings
self._shell = shell
self._clusterRef = clusterRef
self.events = eventStream
}

Expand All @@ -38,29 +38,18 @@ public struct ClusterControl {
}

public func join(node: Node) {
self._shell.tell(.command(.join(node)))
self._clusterRef.tell(.command(.join(node)))
}

public func down(node: Node) {
self._shell.tell(.command(.downCommand(node)))
self._clusterRef.tell(.command(.downCommand(node)))
}

public func down(node: UniqueNode) {
self._shell.tell(.command(.downCommand(node.node)))
self._clusterRef.tell(.command(.downCommand(node.node)))
}

public var node: UniqueNode {
return self.settings.uniqueBindNode
}
}

extension ActorSystem {
public var cluster: ClusterControl {
let shell = self._cluster?.ref ?? self.deadLetters.adapted()
return .init(self.settings.cluster, shell: shell, eventStream: self.clusterEvents)
}

internal var clusterEvents: EventStream<ClusterEvent> {
return self._clusterEvents ?? EventStream(ref: self.deadLetters.adapted())
}
}
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/SWIM/SWIMShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ internal struct SWIMShell {

let handshakeTimeout = TimeAmount.seconds(3)
// FIXME: use reasonable timeout and back off? issue #724
let handshakeResultAnswer = context.system.cluster._shell.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) {
let handshakeResultAnswer = context.system.cluster._clusterRef.ask(for: ClusterShell.HandshakeResult.self, timeout: handshakeTimeout) {
.command(.handshakeWith(remoteNode, replyTo: $0))
}
context.onResultAsync(of: handshakeResultAnswer, timeout: .effectivelyInfinite) { handshakeResultResult in
Expand Down
10 changes: 9 additions & 1 deletion Sources/DistributedActors/EventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@ public struct EventStream<Event> {
internal let ref: ActorRef<EventStreamShell.Message<Event>>

public init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self) throws {
self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type))
try self.init(system, name: name, of: type, systemStream: false)
}

internal init(ref: ActorRef<EventStreamShell.Message<Event>>) {
self.ref = ref
}

internal init(_ system: ActorSystem, name: String, of type: Event.Type = Event.self, systemStream: Bool) throws {
if systemStream {
self.ref = try system._spawnSystemActor(.unique(name), EventStreamShell.behavior(type))
} else {
self.ref = try system.spawn(.unique(name), EventStreamShell.behavior(type))
}
}

public func subscribe(_ ref: ActorRef<Event>) {
self.ref.tell(.subscribe(ref))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class ProcessIsolated {
let funKillServantProcess: (Int) -> Void = { (pid: Int) in
self.lock.withLockVoid {
if let servant = self._servants[pid] {
self.system.cluster._shell.tell(.command(.downCommand(servant.node.node)))
self.system.cluster.down(node: servant.node.node)
self._servants.removeValue(forKey: pid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
let (local, remote) = self.setUpPair()
let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)

local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))

try assertAssociated(local, withExactly: remote.cluster.node)
try assertAssociated(remote, withExactly: local.cluster.node)
Expand All @@ -46,14 +46,14 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
let (local, remote) = self.setUpPair()
let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)

local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))

try assertAssociated(local, withExactly: remote.cluster.node)
try assertAssociated(remote, withExactly: local.cluster.node)

try p.expectMessage(.success(remote.cluster.node))

local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))

try p.expectMessage(.success(remote.cluster.node))
}
Expand Down Expand Up @@ -128,8 +128,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {

// here we attempt to make a race where the nodes race to join each other
// again, only one association should be created.
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
remote.cluster._shell.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
remote.cluster._clusterRef.tell(.command(.handshakeWith(local.cluster.node.node, replyTo: p8228.ref)))

_ = try p7337.expectMessage()
_ = try p8228.expectMessage()
Expand All @@ -145,8 +145,8 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
let p8228 = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)

// we issue two handshakes quickly after each other, both should succeed but there should only be one association established (!)
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p7337.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p8228.ref)))

_ = try p7337.expectMessage()
_ = try p8228.expectMessage()
Expand Down Expand Up @@ -196,7 +196,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {

let p = self.testKit(local).spawnTestProbe(expecting: ClusterShell.HandshakeResult.self)

local.cluster._shell.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))
local.cluster._clusterRef.tell(.command(.handshakeWith(remote.cluster.node.node, replyTo: p.ref)))

try assertNotAssociated(system: local, expectAssociatedNode: remote.cluster.node)
try assertNotAssociated(system: remote, expectAssociatedNode: local.cluster.node)
Expand Down Expand Up @@ -243,7 +243,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
var node = local.cluster.node.node
node.port = node.port + 10

local.cluster._shell.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API
local.cluster._clusterRef.tell(.command(.handshakeWith(node, replyTo: p.ref))) // TODO: nicer API

switch try p.expectMessage(within: .seconds(1)) {
case ClusterShell.HandshakeResult.failure:
Expand All @@ -267,7 +267,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {

// we we down local on local, it should become down there:
try self.testKit(local).eventually(within: .seconds(3)) {
local.cluster._shell.tell(.query(.currentMembership(localProbe.ref)))
local.cluster._clusterRef.tell(.query(.currentMembership(localProbe.ref)))
let localMembership = try localProbe.expectMessage()

guard let selfMember = localMembership.member(local.cluster.node.node) else {
Expand All @@ -281,7 +281,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase {
// although this may be a best effort since the local can just shut down if it wanted to,
// this scenario assumes a graceful leave though:

remote.cluster._shell.tell(.query(.currentMembership(localProbe.ref)))
remote.cluster._clusterRef.tell(.query(.currentMembership(localProbe.ref)))
let remoteMembership = try localProbe.expectMessage()

guard let localMemberObservedOnRemote = remoteMembership.member(local.cluster.node.node) else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ extension ClusteredNodesTestBase {

var infos: [String] = []
for node in self._nodes {
node.cluster._shell.tell(.query(.currentMembership(p.ref)))
node.cluster._clusterRef.tell(.query(.currentMembership(p.ref)))
let membership = try! p.expectMessage()
infos.append(membership.prettyDescription(label: "\(node.cluster.node)"))
}
Expand Down Expand Up @@ -169,7 +169,7 @@ extension ClusteredNodesTestBase {
defer { probe.stop() }

try testKit.eventually(within: timeout ?? .seconds(5), file: file, line: line, column: column) {
system.cluster._shell.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here
system.cluster._clusterRef.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here
let associatedNodes = try probe.expectMessage(file: file, line: line)

if verbose {
Expand Down Expand Up @@ -208,7 +208,7 @@ extension ClusteredNodesTestBase {
let probe = testKit.spawnTestProbe(.prefixed(with: "assertNotAssociated-probe"), expecting: Set<UniqueNode>.self)
defer { probe.stop() }
try testKit.assertHolds(for: timeout ?? .seconds(1)) {
system.cluster._shell.tell(.query(.associatedNodes(probe.ref)))
system.cluster._clusterRef.tell(.query(.associatedNodes(probe.ref)))
let associatedNodes = try probe.expectMessage() // TODO: use interval here
if verbose {
pprint(" Self: \(String(reflecting: system.settings.cluster.uniqueBindNode))")
Expand All @@ -228,7 +228,7 @@ extension ClusteredNodesTestBase {
func assertMemberStatus(_ testKit: ActorTestKit, on system: ActorSystem, member memberSystem: ActorSystem, is expectedStatus: MemberStatus,
file: StaticString = #file, line: UInt = #line) throws {
let p = testKit.spawnTestProbe(expecting: Membership.self)
system.cluster._shell.tell(.query(.currentMembership(p.ref)))
system.cluster._clusterRef.tell(.query(.currentMembership(p.ref)))

let membership = try p.expectMessage()
guard let foundMember = membership.member(memberSystem.cluster.node) else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,16 @@ class RemotingTLSTests: ClusteredNodesTestBase {

do {
let pSystem = testKit.spawnTestProbe(expecting: Set<UniqueNode>.self)
local.cluster._shell.tell(.query(.associatedNodes(pSystem.ref)))
remote.cluster._shell.tell(.query(.associatedNodes(pSystem.ref)))
local.cluster._clusterRef.tell(.query(.associatedNodes(pSystem.ref)))
remote.cluster._clusterRef.tell(.query(.associatedNodes(pSystem.ref)))
let associatedNodes = try pSystem.expectMessage()
associatedNodes.shouldBeEmpty() // means we have not associated to _someone_
}

do {
let pRemote = testKit.spawnTestProbe(expecting: Set<UniqueNode>.self)
local.cluster._shell.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side
remote.cluster._shell.tell(.query(.associatedNodes(pRemote.ref)))
local.cluster._clusterRef.tell(.query(.associatedNodes(pRemote.ref))) // FIXME: We need to get the Accept back and act on it on the origin side
remote.cluster._clusterRef.tell(.query(.associatedNodes(pRemote.ref)))
let associatedNodes = try pRemote.expectMessage()
associatedNodes.shouldBeEmpty() // means we have not associated to _someone_
}
Expand Down