|
13 | 13 | //===----------------------------------------------------------------------===// |
14 | 14 |
|
15 | 15 | import DistributedActors |
| 16 | +import DistributedActorsConcurrencyHelpers |
| 17 | + |
| 18 | +// ==== ---------------------------------------------------------------------------------------------------------------- |
| 19 | +// MARK: Actor singleton plugin |
| 20 | + |
| 21 | +/// The actor singleton plugin ensures that there is no more than one instance of an actor that is defined to be |
| 22 | +/// singleton running in the cluster. |
| 23 | +/// |
| 24 | +/// An actor singleton may run on any node in the cluster. Use `ActorSingletonSettings.allocationStrategy` to control |
| 25 | +/// its allocation. On candidate nodes where the singleton might run, use `ActorSystem.singleton.ref(type:name:props:behavior)` |
| 26 | +/// to define actor behavior. Otherwise, call `ActorSystem.singleton.ref(type:name:)` to obtain a ref. The returned |
| 27 | +/// `ActorRef` is in reality a proxy which handle situations where the singleton is shifted to different nodes. |
| 28 | +/// |
| 29 | +/// - Warning: Refer to the configured `AllocationStrategy` for trade-offs between safety and recovery latency for |
| 30 | +/// the singleton allocation. |
| 31 | +/// - SeeAlso: The `ActorSingleton` mechanism is conceptually similar to Erlang/OTP's <a href="http://erlang.org/doc/design_principles/distributed_applications.html">`DistributedApplication`</a>, |
| 32 | +/// and <a href="https://doc.akka.io/docs/akka/current/cluster-singleton.html">`ClusterSingleton` in Akka</a>. |
| 33 | +public final class ActorSingletonPlugin { |
| 34 | + private var singletons: [String: BoxedActorSingleton] = [:] |
| 35 | + private let singletonsLock = Lock() |
| 36 | + |
| 37 | + public init() {} |
| 38 | + |
| 39 | + func ref<Message>(of type: Message.Type, settings: ActorSingletonSettings, system: ActorSystem, props: Props? = nil, _ behavior: Behavior<Message>? = nil) throws -> ActorRef<Message> { |
| 40 | + try self.singletonsLock.withLock { |
| 41 | + if let existing = self.singletons[settings.name] { |
| 42 | + guard let proxy = existing.unsafeUnwrapAs(Message.self).proxy else { |
| 43 | + fatalError("Singleton [\(settings.name)] not yet initialized") |
| 44 | + } |
| 45 | + return proxy |
| 46 | + } |
| 47 | + |
| 48 | + let singleton = ActorSingleton<Message>(settings: settings, props: props, behavior) |
| 49 | + try singleton.spawnAll(system) |
| 50 | + self.singletons[settings.name] = BoxedActorSingleton(singleton) |
| 51 | + |
| 52 | + guard let proxy = singleton.proxy else { |
| 53 | + fatalError("Singleton[\(settings.name)] not yet initialized") |
| 54 | + } |
| 55 | + |
| 56 | + return proxy |
| 57 | + } |
| 58 | + } |
| 59 | + |
| 60 | + func actor<Act: Actorable>(of type: Act.Type, settings: ActorSingletonSettings, system: ActorSystem, props: Props? = nil, _ makeInstance: ((Actor<Act>.Context) -> Act)? = nil) throws -> Actor<Act> { |
| 61 | + let behavior = makeInstance.map { maker in |
| 62 | + Behavior<Act.Message>.setup { context in |
| 63 | + Act.makeBehavior(instance: maker(.init(underlying: context))) |
| 64 | + } |
| 65 | + } |
| 66 | + let ref = try self.ref(of: Act.Message.self, settings: settings, system: system, behavior) |
| 67 | + return Actor<Act>(ref: ref) |
| 68 | + } |
| 69 | +} |
| 70 | + |
| 71 | +extension ActorSingletonPlugin { |
| 72 | + func ref<Message>(of type: Message.Type, name: String, system: ActorSystem, props: Props? = nil, _ behavior: Behavior<Message>? = nil) throws -> ActorRef<Message> { |
| 73 | + let settings = ActorSingletonSettings(name: name) |
| 74 | + return try self.ref(of: type, settings: settings, system: system, props: props, behavior) |
| 75 | + } |
| 76 | + |
| 77 | + func actor<Act: Actorable>(of type: Act.Type, name: String, system: ActorSystem, props: Props? = nil, _ makeInstance: ((Actor<Act>.Context) -> Act)? = nil) throws -> Actor<Act> { |
| 78 | + let settings = ActorSingletonSettings(name: name) |
| 79 | + return try self.actor(of: type, settings: settings, system: system, props: props, makeInstance) |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +// ==== ---------------------------------------------------------------------------------------------------------------- |
| 84 | +// MARK: Plugin protocol conformance |
| 85 | + |
| 86 | +extension ActorSingletonPlugin: Plugin { |
| 87 | + static let pluginKey = PluginKey<ActorSingletonPlugin>(plugin: "$actorSingleton") |
| 88 | + |
| 89 | + public var key: Key { |
| 90 | + Self.pluginKey |
| 91 | + } |
| 92 | + |
| 93 | + public func start(_ system: ActorSystem) -> Result<Void, Error> { |
| 94 | + .success(()) |
| 95 | + } |
| 96 | + |
| 97 | + // TODO: Future |
| 98 | + public func stop(_ system: ActorSystem) -> Result<Void, Error> { |
| 99 | + self.singletonsLock.withLock { |
| 100 | + for (_, singleton) in self.singletons { |
| 101 | + singleton.stop(system) |
| 102 | + } |
| 103 | + } |
| 104 | + return .success(()) |
| 105 | + } |
| 106 | +} |
| 107 | + |
| 108 | +// ==== ---------------------------------------------------------------------------------------------------------------- |
| 109 | +// MARK: Singleton refs and actors |
16 | 110 |
|
17 | 111 | extension ActorSystem { |
18 | | - public var singleton: ActorSingletonLookup { |
| 112 | + public var singleton: ActorSingletonControl { |
19 | 113 | .init(self) |
20 | 114 | } |
21 | 115 | } |
22 | 116 |
|
23 | | -/// Allows for simplified lookups of actor references which are known to be managed by `ActorSingleton`. |
24 | | -public struct ActorSingletonLookup { |
| 117 | +/// Provides actor singleton controls such as obtaining a singleton ref and defining the singleton. |
| 118 | +public struct ActorSingletonControl { |
25 | 119 | private let system: ActorSystem |
26 | 120 |
|
27 | 121 | internal init(_ system: ActorSystem) { |
28 | 122 | self.system = system |
29 | 123 | } |
30 | 124 |
|
31 | | - /// Obtains a reference to a (proxy) singleton regardless of its current location. |
32 | | - public func ref<Message>(name: String, of type: Message.Type) throws -> ActorRef<Message> { |
33 | | - let key = ActorSingleton<Message>.pluginKey(name: name) |
34 | | - guard let singleton = self.system.settings.plugins[key] else { |
| 125 | + private var singletonPlugin: ActorSingletonPlugin { |
| 126 | + let key = ActorSingletonPlugin.pluginKey |
| 127 | + guard let singletonPlugin = self.system.settings.plugins[key] else { |
35 | 128 | fatalError("No plugin found for key: [\(key)], installed plugins: \(self.system.settings.plugins)") |
36 | 129 | } |
37 | | - guard let proxy = singleton.proxy else { |
38 | | - fatalError("Singleton[\(key)] not yet initialized") |
39 | | - } |
40 | | - return proxy // FIXME: Worried that we never synchronize access to proxy... |
| 130 | + return singletonPlugin |
41 | 131 | } |
42 | 132 |
|
43 | | - public func actor<Act: Actorable>(name: String, _ type: Act.Type) throws -> Actor<Act> { |
44 | | - let ref = try self.ref(name: name, of: Act.Message.self) |
45 | | - return Actor<Act>(ref: ref) |
| 133 | + /// Defines a singleton `behavior` and indicates that it can be hosted on this node. |
| 134 | + public func host<Message>(_ type: Message.Type, name: String, props: Props = Props(), _ behavior: Behavior<Message>) throws -> ActorRef<Message> { |
| 135 | + try self.singletonPlugin.ref(of: type, name: name, system: self.system, props: props, behavior) |
| 136 | + } |
| 137 | + |
| 138 | + /// Defines a singleton `behavior` and indicates that it can be hosted on this node. |
| 139 | + public func host<Message>(_ type: Message.Type, settings: ActorSingletonSettings, props: Props = Props(), _ behavior: Behavior<Message>) throws -> ActorRef<Message> { |
| 140 | + try self.singletonPlugin.ref(of: type, settings: settings, system: self.system, props: props, behavior) |
| 141 | + } |
| 142 | + |
| 143 | + /// Defines a singleton `Actorable` and indicates that it can be hosted on this node. |
| 144 | + public func host<Act: Actorable>(_ type: Act.Type, name: String, props: Props = Props(), _ makeInstance: @escaping (Actor<Act>.Context) -> Act) throws -> Actor<Act> { |
| 145 | + try self.singletonPlugin.actor(of: type, name: name, system: self.system, props: props, makeInstance) |
| 146 | + } |
| 147 | + |
| 148 | + /// Defines a singleton `Actorable` and indicates that it can be hosted on this node. |
| 149 | + public func host<Act: Actorable>(_ type: Act.Type, settings: ActorSingletonSettings, props: Props = Props(), _ makeInstance: @escaping (Actor<Act>.Context) -> Act) throws -> Actor<Act> { |
| 150 | + try self.singletonPlugin.actor(of: type, settings: settings, system: self.system, props: props, makeInstance) |
| 151 | + } |
| 152 | + |
| 153 | + /// Obtains a ref to the specified actor singleton. |
| 154 | + public func ref<Message>(of type: Message.Type, name: String) throws -> ActorRef<Message> { |
| 155 | + try self.singletonPlugin.ref(of: type, name: name, system: self.system) |
| 156 | + } |
| 157 | + |
| 158 | + /// Obtains the specified singleton actor. |
| 159 | + public func actor<Act: Actorable>(of type: Act.Type, name: String) throws -> Actor<Act> { |
| 160 | + try self.singletonPlugin.actor(of: type, name: name, system: self.system) |
46 | 161 | } |
47 | 162 | } |
0 commit comments