Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 51 additions & 52 deletions Sources/ActorSingletonPlugin/ActorSingleton.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,94 +13,93 @@
//===----------------------------------------------------------------------===//

import DistributedActors
import DistributedActorsConcurrencyHelpers

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Actor singleton

/// An `ActorSingleton` ensures that there is no more than one instance of an actor running in the cluster.
///
/// Actors that are singleton must be registered during system setup, as part of `ActorSystemSettings`.
/// The `ActorRef` of the singleton can later be obtained through `ActorSystem.singleton.ref(name:)`.
///
/// A singleton may run on any node in the cluster. Use `ActorSingletonSettings.allocationStrategy` to control node
/// allocation. The `ActorRef` returned by `ref(name:)` is actually a proxy in order to handle situations where the
/// singleton is shifted to different nodes.
///
/// - Warning: Refer to the configured `AllocationStrategy` for trade-offs between safety and recovery latency for
/// the singleton allocation.
/// - SeeAlso: The `ActorSingleton` mechanism conceptually similar to Erlang/OTP's <a href="http://erlang.org/doc/design_principles/distributed_applications.html">`DistributedApplication`</a>,
// and <a href="https://doc.akka.io/docs/akka/current/cluster-singleton.html">`ClusterSingleton` in Akka</a>.
public final class ActorSingleton<Message> {
internal final class ActorSingleton<Message> {
/// Settings for the `ActorSingleton`
public let settings: ActorSingletonSettings
let settings: ActorSingletonSettings

/// Props of singleton behavior
public let props: Props
/// The singleton behavior
public let behavior: Behavior<Message>
let props: Props?
/// The singleton behavior.
/// If `nil`, then this instance will be proxy-only and it will never run the actual actor.
let behavior: Behavior<Message>?

/// The `ActorSingletonProxy` ref
internal private(set) var proxy: ActorRef<Message>?
private var _proxy: ActorRef<Message>?
private let proxyLock = Lock()

/// Defines a `behavior` as singleton with `settings`.
public init(settings: ActorSingletonSettings, props: Props = Props(), _ behavior: Behavior<Message>) {
internal var proxy: ActorRef<Message>? {
self.proxyLock.withLock {
self._proxy
}
}

init(settings: ActorSingletonSettings, props: Props?, _ behavior: Behavior<Message>?) {
self.settings = settings
self.props = props
self.behavior = behavior
}

/// Defines a `behavior` as singleton identified by `name`.
public convenience init(_ name: String, props: Props = Props(), _ behavior: Behavior<Message>) {
let settings = ActorSingletonSettings(name: name)
self.init(settings: settings, props: props, behavior)
}

/// Spawns `ActorSingletonProxy` and associated actors (e.g., `ActorSingleManager`).
internal func spawnAll(_ system: ActorSystem) throws {
/// Spawns `ActorSingletonProxy` and associated actors (e.g., `ActorSingletonManager`).
func spawnAll(_ system: ActorSystem) throws {
let allocationStrategy = self.settings.allocationStrategy.make(system.settings.cluster, self.settings)
self.proxy = try system._spawnSystemActor(
"singletonProxy-\(self.settings.name)",
ActorSingletonProxy(settings: self.settings, allocationStrategy: allocationStrategy, props: self.props, self.behavior).behavior,
props: ._wellKnown
)
try self.proxyLock.withLock {
self._proxy = try system._spawnSystemActor(
"singletonProxy-\(self.settings.name)",
ActorSingletonProxy(settings: self.settings, allocationStrategy: allocationStrategy, props: self.props, self.behavior).behavior,
props: ._wellKnown
)
}
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Plugin protocol conformance
// MARK: Type-erased actor singleton

extension ActorSingleton: Plugin {
public static func pluginKey(name: String) -> PluginKey<ActorSingleton<Message>> {
PluginKey<ActorSingleton<Message>>(plugin: "$actorSingleton").makeSub(name)
}
internal protocol AnyActorSingleton {
/// Stops the `ActorSingletonProxy` running in the `system`.
/// If `ActorSingletonManager` is also running, which means the actual singleton is hosted
/// on this node, it will attempt to hand-over the singleton gracefully before stopping.
func stop(_ system: ActorSystem)
}

internal struct BoxedActorSingleton: AnyActorSingleton {
private let underlying: AnyActorSingleton

public var key: PluginKey<ActorSingleton<Message>> {
Self.pluginKey(name: self.settings.name)
init<Message>(_ actorSingleton: ActorSingleton<Message>) {
self.underlying = actorSingleton
}

public func start(_ system: ActorSystem) -> Result<Void, Error> {
do {
try self.spawnAll(system)
return .success(())
} catch {
return .failure(error)
func unsafeUnwrapAs<Message>(_ type: Message.Type) -> ActorSingleton<Message> {
guard let unwrapped = self.underlying as? ActorSingleton<Message> else {
fatalError("Type mismatch, expected: [\(String(reflecting: ActorSingleton<Message>.self))] got [\(self.underlying)]")
}
return unwrapped
}

// TODO: Future
public func stop(_ system: ActorSystem) -> Result<Void, Error> {
func stop(_ system: ActorSystem) {
self.underlying.stop(system)
}
}

extension ActorSingleton: AnyActorSingleton {
func stop(_ system: ActorSystem) {
// Hand over the singleton gracefully
let resolveContext = ResolveContext<ActorSingletonManager<Message>.Directive>(address: ._singletonManager(name: self.settings.name), system: system)
let managerRef = system._resolve(context: resolveContext)
// If the manager is not running this will end up in dead-letters but that's fine
managerRef.tell(.stop)

// We don't control the proxy's directives so we can't tell it to stop
return .success(())
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: ActorSingleton settings
// MARK: Actor singleton settings

/// Settings for a `ActorSingleton`.
public struct ActorSingletonSettings {
Expand All @@ -125,7 +124,7 @@ public struct ActorSingletonSettings {

/// Singleton node allocation strategies.
public enum AllocationStrategySettings {
/// Singletons will run on the cluster leader
/// Singletons will run on the cluster leader. *All* nodes are potential candidates.
case byLeadership

func make(_: ClusterSettings, _: ActorSingletonSettings) -> ActorSingletonAllocationStrategy {
Expand Down
4 changes: 0 additions & 4 deletions Sources/ActorSingletonPlugin/ActorSingletonManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ extension ActorSingletonManager {
// MARK: ActorSingletonManager path / address

extension ActorAddress {
internal static func _singletonManager(name: String, on node: UniqueNode) -> ActorAddress {
.init(node: node, path: ._singletonManager(name: name), incarnation: .wellKnown)
}

internal static func _singletonManager(name: String) -> ActorAddress {
.init(path: ._singletonManager(name: name), incarnation: .wellKnown)
}
Expand Down
143 changes: 129 additions & 14 deletions Sources/ActorSingletonPlugin/ActorSingletonPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,150 @@
//===----------------------------------------------------------------------===//

import DistributedActors
import DistributedActorsConcurrencyHelpers

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Actor singleton plugin

/// The actor singleton plugin ensures that there is no more than one instance of an actor that is defined to be
/// singleton running in the cluster.
///
/// An actor singleton may run on any node in the cluster. Use `ActorSingletonSettings.allocationStrategy` to control
/// its allocation. On candidate nodes where the singleton might run, use `ActorSystem.singleton.ref(type:name:props:behavior)`
/// to define actor behavior. Otherwise, call `ActorSystem.singleton.ref(type:name:)` to obtain a ref. The returned
/// `ActorRef` is in reality a proxy which handle situations where the singleton is shifted to different nodes.
///
/// - Warning: Refer to the configured `AllocationStrategy` for trade-offs between safety and recovery latency for
/// the singleton allocation.
/// - SeeAlso: The `ActorSingleton` mechanism is conceptually similar to Erlang/OTP's <a href="http://erlang.org/doc/design_principles/distributed_applications.html">`DistributedApplication`</a>,
/// and <a href="https://doc.akka.io/docs/akka/current/cluster-singleton.html">`ClusterSingleton` in Akka</a>.
public final class ActorSingletonPlugin {
private var singletons: [String: BoxedActorSingleton] = [:]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worry about this. This is mutable. Safe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to comment here -- yes this is not safe indeed...

We need to put a lock around accessing these and host() calls etc...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How I'd really want to write all this is (but it's too annoying I think, until we have async/await things):

let ref = await plugin.ref()

since then all of the mutable state can actually be put inside an actor, and we'd implement by "asking the plugin actor about the ref" rather than doing it here where we are not safe from concurrent access...

We could do this today but it gets too annoying, since it'd be like -> AskResponse<ActorRef<...>> which I think is annoying... :-(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use NIO's lock in this project or something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private let singletonsLock = Lock()

public init() {}

func ref<Message>(of type: Message.Type, settings: ActorSingletonSettings, system: ActorSystem, props: Props? = nil, _ behavior: Behavior<Message>? = nil) throws -> ActorRef<Message> {
try self.singletonsLock.withLock {
if let existing = self.singletons[settings.name] {
guard let proxy = existing.unsafeUnwrapAs(Message.self).proxy else {
fatalError("Singleton [\(settings.name)] not yet initialized")
}
return proxy
}

let singleton = ActorSingleton<Message>(settings: settings, props: props, behavior)
try singleton.spawnAll(system)
self.singletons[settings.name] = BoxedActorSingleton(singleton)

guard let proxy = singleton.proxy else {
fatalError("Singleton[\(settings.name)] not yet initialized")
}

return proxy
}
}

func actor<Act: Actorable>(of type: Act.Type, settings: ActorSingletonSettings, system: ActorSystem, props: Props? = nil, _ makeInstance: ((Actor<Act>.Context) -> Act)? = nil) throws -> Actor<Act> {
let behavior = makeInstance.map { maker in
Behavior<Act.Message>.setup { context in
Act.makeBehavior(instance: maker(.init(underlying: context)))
}
}
let ref = try self.ref(of: Act.Message.self, settings: settings, system: system, behavior)
return Actor<Act>(ref: ref)
}
}

extension ActorSingletonPlugin {
func ref<Message>(of type: Message.Type, name: String, system: ActorSystem, props: Props? = nil, _ behavior: Behavior<Message>? = nil) throws -> ActorRef<Message> {
let settings = ActorSingletonSettings(name: name)
return try self.ref(of: type, settings: settings, system: system, props: props, behavior)
}

func actor<Act: Actorable>(of type: Act.Type, name: String, system: ActorSystem, props: Props? = nil, _ makeInstance: ((Actor<Act>.Context) -> Act)? = nil) throws -> Actor<Act> {
let settings = ActorSingletonSettings(name: name)
return try self.actor(of: type, settings: settings, system: system, props: props, makeInstance)
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Plugin protocol conformance

extension ActorSingletonPlugin: Plugin {
static let pluginKey = PluginKey<ActorSingletonPlugin>(plugin: "$actorSingleton")

public var key: Key {
Self.pluginKey
}

public func start(_ system: ActorSystem) -> Result<Void, Error> {
.success(())
}

// TODO: Future
public func stop(_ system: ActorSystem) -> Result<Void, Error> {
self.singletonsLock.withLock {
for (_, singleton) in self.singletons {
singleton.stop(system)
}
}
return .success(())
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Singleton refs and actors

extension ActorSystem {
public var singleton: ActorSingletonLookup {
public var singleton: ActorSingletonControl {
.init(self)
}
}

/// Allows for simplified lookups of actor references which are known to be managed by `ActorSingleton`.
public struct ActorSingletonLookup {
/// Provides actor singleton controls such as obtaining a singleton ref and defining the singleton.
public struct ActorSingletonControl {
private let system: ActorSystem

internal init(_ system: ActorSystem) {
self.system = system
}

/// Obtains a reference to a (proxy) singleton regardless of its current location.
public func ref<Message>(name: String, of type: Message.Type) throws -> ActorRef<Message> {
let key = ActorSingleton<Message>.pluginKey(name: name)
guard let singleton = self.system.settings.plugins[key] else {
private var singletonPlugin: ActorSingletonPlugin {
let key = ActorSingletonPlugin.pluginKey
guard let singletonPlugin = self.system.settings.plugins[key] else {
fatalError("No plugin found for key: [\(key)], installed plugins: \(self.system.settings.plugins)")
}
guard let proxy = singleton.proxy else {
fatalError("Singleton[\(key)] not yet initialized")
}
return proxy // FIXME: Worried that we never synchronize access to proxy...
return singletonPlugin
}

public func actor<Act: Actorable>(name: String, _ type: Act.Type) throws -> Actor<Act> {
let ref = try self.ref(name: name, of: Act.Message.self)
return Actor<Act>(ref: ref)
/// Defines a singleton `behavior` and indicates that it can be hosted on this node.
public func host<Message>(_ type: Message.Type, name: String, props: Props = Props(), _ behavior: Behavior<Message>) throws -> ActorRef<Message> {
try self.singletonPlugin.ref(of: type, name: name, system: self.system, props: props, behavior)
}

/// Defines a singleton `behavior` and indicates that it can be hosted on this node.
public func host<Message>(_ type: Message.Type, settings: ActorSingletonSettings, props: Props = Props(), _ behavior: Behavior<Message>) throws -> ActorRef<Message> {
try self.singletonPlugin.ref(of: type, settings: settings, system: self.system, props: props, behavior)
}

/// Defines a singleton `Actorable` and indicates that it can be hosted on this node.
public func host<Act: Actorable>(_ type: Act.Type, name: String, props: Props = Props(), _ makeInstance: @escaping (Actor<Act>.Context) -> Act) throws -> Actor<Act> {
try self.singletonPlugin.actor(of: type, name: name, system: self.system, props: props, makeInstance)
}

/// Defines a singleton `Actorable` and indicates that it can be hosted on this node.
public func host<Act: Actorable>(_ type: Act.Type, settings: ActorSingletonSettings, props: Props = Props(), _ makeInstance: @escaping (Actor<Act>.Context) -> Act) throws -> Actor<Act> {
try self.singletonPlugin.actor(of: type, settings: settings, system: self.system, props: props, makeInstance)
}

/// Obtains a ref to the specified actor singleton.
public func ref<Message>(of type: Message.Type, name: String) throws -> ActorRef<Message> {
try self.singletonPlugin.ref(of: type, name: name, system: self.system)
}

/// Obtains the specified singleton actor.
public func actor<Act: Actorable>(of type: Act.Type, name: String) throws -> Actor<Act> {
try self.singletonPlugin.actor(of: type, name: name, system: self.system)
}
}
21 changes: 14 additions & 7 deletions Sources/ActorSingletonPlugin/ActorSingletonProxy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import Logging
/// would be disposed to allow insertion of the latest message.
///
/// The proxy subscribes to events and feeds them into `AllocationStrategy` to determine the node that the
/// singleton runs on. It spawns a `ActorSingletonManager`, which manages the actual singleton actor, as needed and
/// obtains the ref from it. It instructs the `ActorSingletonManager` to hand over the singleton when the node changes.
/// singleton runs on. If the singleton falls on *this* node, the proxy will spawn a `ActorSingletonManager`,
/// which manages the actual singleton actor, and obtain the ref from it. The proxy instructs the
/// `ActorSingletonManager` to hand over the singleton whenever the node changes.
internal class ActorSingletonProxy<Message> {
/// Settings for the `ActorSingleton`
private let settings: ActorSingletonSettings
Expand All @@ -38,9 +39,11 @@ internal class ActorSingletonProxy<Message> {
private let allocationStrategy: ActorSingletonAllocationStrategy

/// Props of the singleton behavior
private let singletonProps: Props
/// The singleton behavior
private let singletonBehavior: Behavior<Message>
private let singletonProps: Props?
/// The singleton behavior.
/// If `nil`, then this node is not a candidate for hosting the singleton. It would result
/// in a failure if `allocationStrategy` selects this node by mistake.
private let singletonBehavior: Behavior<Message>?

/// The node that the singleton runs on
private var targetNode: UniqueNode?
Expand All @@ -54,7 +57,7 @@ internal class ActorSingletonProxy<Message> {
/// Message buffer in case singleton `ref` is `nil`
private let buffer: StashBuffer<Message>

init(settings: ActorSingletonSettings, allocationStrategy: ActorSingletonAllocationStrategy, props: Props, _ behavior: Behavior<Message>) {
init(settings: ActorSingletonSettings, allocationStrategy: ActorSingletonAllocationStrategy, props: Props? = nil, _ behavior: Behavior<Message>? = nil) {
self.settings = settings
self.allocationStrategy = allocationStrategy
self.singletonProps = props
Expand Down Expand Up @@ -120,10 +123,14 @@ internal class ActorSingletonProxy<Message> {
}

private func takeOver(_ context: ActorContext<Message>, from: UniqueNode?) throws {
guard let singletonBehavior = self.singletonBehavior else {
preconditionFailure("The actor singleton \(self.settings.name) cannot run on this node. Please review AllocationStrategySettings and/or actor singleton usage.")
}

// Spawn the manager then tell it to spawn the singleton actor
self.managerRef = try context.system._spawnSystemActor(
"singletonManager-\(self.settings.name)",
ActorSingletonManager(settings: self.settings, props: self.singletonProps, self.singletonBehavior).behavior,
ActorSingletonManager(settings: self.settings, props: self.singletonProps ?? Props(), singletonBehavior).behavior,
props: ._wellKnown
)
// Need the manager to tell us the ref because we can't resolve it due to random incarnation
Expand Down
2 changes: 0 additions & 2 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public final class ActorSystem {

internal let _root: _ReceivesSystemMessages

private let terminationLock = Lock()

/// Allows inspecting settings that were used to configure this actor system.
/// Settings are immutable and may not be changed once the system is running.
public let settings: ActorSystemSettings
Expand Down
Loading