Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions Sources/ActorSingletonPlugin/ActorSingletonProxy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ internal class ActorSingletonProxy<Message> {
}

// Update `ref` regardless
context.log.debug("Updating ref for singleton [\(self.settings.name)] to node [\(String(describing: node))]")
self.updateRef(context, node: node)
}
}
Expand Down Expand Up @@ -161,34 +160,42 @@ internal class ActorSingletonProxy<Message> {
}

private func updateRef(_ context: ActorContext<Message>, _ newRef: ActorRef<Message>?) {
context.log.debug("Updating ref from [\(String(describing: self.ref))] to [\(String(describing: newRef))], flushing \(self.buffer.count) messages.")
context.log.debug("Updating ref from [\(String(describing: self.ref))] to [\(String(describing: newRef))], flushing \(self.buffer.count) messages")
self.ref = newRef

// Unstash messages if we have the singleton
if let ref = self.ref {
while let stashed = self.buffer.take() {
ref.tell(stashed)
}
guard let singleton = self.ref else {
return
}

while let stashed = self.buffer.take() {
context.log.debug("Flushing \(stashed), to \(singleton)")
singleton.tell(stashed)
}
}

private func forwardOrStash(_ context: ActorContext<Message>, message: Message) throws {
// Forward the message if `singleton` is not `nil`, else stash it.
if let singleton = self.ref {
context.log.trace("forwarding message: \(singleton.address)")
context.log.trace("Forwarding message \(message), to: \(singleton.address)", metadata: self.metadata(context))
singleton.tell(message)
} else {
context.log.trace("stashing message")
if self.buffer.isFull {
// TODO: log this warning only "once in while" after buffer becomes full
context.log.warning("Buffer is full. Messages might start getting disposed.", metadata: self.metadata(context))
// Move the oldest message to dead letters to make room
if let oldestMessage = self.buffer.take() {
context.system.deadLetters.tell(DeadLetter(oldestMessage, recipient: context.address))
do {
try self.buffer.stash(message: message)
context.log.trace("Stashed message: \(message)", metadata: self.metadata(context))
} catch {
switch error {
case StashError.full:
// TODO: log this warning only "once in while" after buffer becomes full
context.log.warning("Buffer is full. Messages might start getting disposed.", metadata: self.metadata(context))
// Move the oldest message to dead letters to make room
if let oldestMessage = self.buffer.take() {
context.system.deadLetters.tell(DeadLetter(oldestMessage, recipient: context.address))
}
default:
context.log.warning("Unable to stash message, error: \(error)", metadata: self.metadata(context))
}
}

try self.buffer.stash(message: message)
}
}
}
Expand All @@ -199,18 +206,19 @@ internal class ActorSingletonProxy<Message> {
extension ActorSingletonProxy {
func metadata<Message>(_: ActorContext<Message>) -> Logger.Metadata {
var metadata: Logger.Metadata = [
"name": "\(self.settings.name)",
"buffer": "\(self.buffer.count)/\(self.settings.bufferCapacity)",
"tag": "singleton",
"singleton/name": "\(self.settings.name)",
"singleton/buffer": "\(self.buffer.count)/\(self.settings.bufferCapacity)",
]

if let targetNode = self.targetNode {
metadata["targetNode"] = "\(targetNode)"
}
if let ref = self.ref {
metadata["ref"] = "\(ref)"
metadata["ref"] = "\(ref.address)"
}
if let managerRef = self.managerRef {
metadata["managerRef"] = "\(managerRef)"
metadata["managerRef"] = "\(managerRef.address)"
}

return metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Expand Down
4 changes: 4 additions & 0 deletions Sources/DistributedActors/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public struct ActorLogger {
}

public static func make(system: ActorSystem, identifier: String? = nil) -> Logger {
if let overriddenLoggerFactory = system.settings.overrideLoggerFactory {
return overriddenLoggerFactory(identifier ?? system.name)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was missing and thus not capturing logs on system level


// we need to add our own storage, and can't do so to Logger since it is a struct...
// so we need to make such "proxy log handler", that does out actor specific things.
var proxyHandler = ActorOriginLogHandler(system)
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/ActorShell+Children.swift
Original file line number Diff line number Diff line change
Expand Up @@ -398,5 +398,5 @@ public enum ActorContextError: Error {
/// It is not allowed to spawn
case duplicateActorPath(path: ActorPath)
/// It is not allowed to spawn new actors when the system is stopping
case alreadyStopping
case alreadyStopping(String)
}
4 changes: 2 additions & 2 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public final class ActorSystem {
// MARK: Logging

public var log: Logger {
var l = ActorLogger.make(system: self) // we only do this to go "through" the proxy; we may not need it in the future?
var l = ActorLogger.make(system: self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works now

l.logLevel = self.settings.defaultLogLevel
return l
}
Expand Down Expand Up @@ -337,7 +337,7 @@ public final class ActorSystem {
self.settings.plugins.stopAll(self)

DispatchQueue.global().async {
self.log.log(level: .debug, "SHUTTING DOWN ACTOR SYSTEM [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
self.log.log(level: .debug, "Shutting down actor system [\(self.name)]. All actors will be stopped.", file: #file, function: #function, line: #line)
if let cluster = self._cluster {
let receptacle = BlockingReceptacle<Void>()
cluster.ref.tell(.command(.shutdown(receptacle))) // FIXME: should be shutdown
Expand Down
3 changes: 2 additions & 1 deletion Sources/DistributedActors/CRDT/CRDTReplicatorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ extension CRDT.Replicator {
self.receiveClusterEvent(context, event: .membershipChange(change))
}

default:
case .membershipChange:
context.log.trace("Ignoring cluster event \(event), only interested in >= .up events", metadata: self.metadata(context))
default:
() // ignore other events
}
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/DistributedActors/Cluster/Cluster+Member.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ extension Cluster {
public var status: Cluster.MemberStatus

/// Reachability signifies the failure detectors assessment about this members "reachability" i.e. if it is responding to health checks or not.
///
/// ### Reachability of .down or .removed nodes
/// Worth pointing out that a `.down` member may still have a `.reachable` reachability field,
/// this usually means that the decision to move the member `.down` was not made by the failure detection layer,
/// but rather issued programmatically, or by some other non-reachability provoked reason.
public var reachability: Cluster.MemberReachability

/// Sequence number at which this node was moved to `.up` by a leader.
Expand Down
8 changes: 7 additions & 1 deletion Sources/DistributedActors/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ extension Cluster.Membership {
if self.firstMember(change.node.node) == nil { // TODO: more general? // TODO this entire method should be simpler
_ = self.join(change.node)
}
return self.mark(change.node, as: status)
let change = self.mark(change.node, as: status)
return change
}
}

Expand Down Expand Up @@ -278,6 +279,11 @@ extension Cluster.Membership {
}
}

/// Alias for `applyLeadershipChange(to:)`
public mutating func applyLeadershipChange(_ change: Cluster.LeadershipChange?) throws -> Cluster.LeadershipChange? {
try self.applyLeadershipChange(to: change?.newLeader)
}

/// - Returns: the changed member if the change was a transition (unreachable -> reachable, or back),
/// or `nil` if the reachability is the same as already known by the membership.
public mutating func applyReachabilityChange(_ change: Cluster.ReachabilityChange) -> Cluster.Member? {
Expand Down
8 changes: 8 additions & 0 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ public struct ClusterControl {
self.ref.tell(.command(.initJoin(node)))
}

public func leave() {
self.ref.tell(.command(.downCommand(self.node.node)))
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may get it's own command, we have a .leaving status in membership.

The idea is while leaving we may still perform actions but others would not give us new work etc.
This would be used by virtual actors and singletons


/// Mark as `Cluster.MemberStatus.down` _any_ incarnation of a member matching the passed in `node`.
public func down(node: Node) {
self.ref.tell(.command(.downCommand(node)))
}

public func down(member: Cluster.Member) {
self.ref.tell(.command(.downCommandMember(member)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting / important, thanks to using this version internally, we are carrying all the metadata that a member has correctly -- such as the reachability at the moment when someone decided to call down. Mostly a "more correct view in the cluster membership" change. For end users calling either of them will yield the expected result

}
}
65 changes: 63 additions & 2 deletions Sources/DistributedActors/Cluster/ClusterReceptionist.swift
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ internal enum ClusterReceptionist {
}

private static func syncRegistrations(context: ActorContext<Receptionist.Message>, myself: ActorRef<ClusterReceptionist.FullState>) throws {
let remoteControls = context.system._cluster!.associationRemoteControls // FIXME: should not be needed and use cluster members instead
guard let cluster = context.system._cluster else { // FIXME: should not be needed and use cluster members instead
return // cannot get _cluster, perhaps we are shutting down already?
}
let remoteControls = cluster.associationRemoteControls

guard !remoteControls.isEmpty else {
return // nothing to do, no remote members
Expand All @@ -249,6 +252,64 @@ internal enum ClusterReceptionist {
}

private static func makeRemoteAddress(on node: UniqueNode) -> ActorAddress {
return try! .init(node: node, path: ActorPath([ActorPathSegment("system"), ActorPathSegment("receptionist")]), incarnation: .wellKnown)
try! .init(node: node, path: ActorPath([ActorPathSegment("system"), ActorPathSegment("receptionist")]), incarnation: .wellKnown) // try! safe, we know the path is legal
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: DowningStrategySettings

public enum DowningStrategySettings {
case none
case timeout(TimeoutBasedDowningStrategySettings)

func make(_ clusterSettings: ClusterSettings) -> DowningStrategy? {
switch self {
case .none:
return nil
case .timeout(let settings):
return TimeoutBasedDowningStrategy(settings, selfNode: clusterSettings.uniqueBindNode)
}
}
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: OnDownActionStrategySettings

public enum OnDownActionStrategySettings {
/// Take no (automatic) action upon noticing that this member is marked as [.down].
///
/// When using this mode you should take special care to implement some form of shutting down of this node (!).
/// As a `Cluster.MemberStatus.down` node is effectively useless for the rest of the cluster -- i.e. other
/// members MUST refuse communication with this down node.
case none
/// Upon noticing that this member is marked as [.down], initiate a shutdown.
case gracefulShutdown(delay: TimeAmount)

func make() -> (ActorSystem) throws -> Void {
switch self {
case .none:
return { _ in () } // do nothing

case .gracefulShutdown(let shutdownDelay):
return { system in
_ = try system.spawn("leaver", of: String.self, .setup { context in
guard .milliseconds(0) < shutdownDelay else {
context.log.warning("This node was marked as [.down], delay is immediate. Shutting down the system immediately!")
system.shutdown()
return .stop
}

context.timers.startSingle(key: "shutdown-delay", message: "shutdown", delay: shutdownDelay)
system.log.warning("This node was marked as [.down], performing OnDownAction as configured: shutting down the system, in \(shutdownDelay)")

return .receiveMessage { _ in
system.log.warning("Shutting down...")
system.shutdown()
return .stop
}
})
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slight delay is useful for allowing to spread the .down gossip to others before we really die.

Tests also cover the "shutdown immediately" case that it all works correctly 👍

}
}
25 changes: 9 additions & 16 deletions Sources/DistributedActors/Cluster/ClusterSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public struct ClusterSettings {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Leader Election

public var autoLeaderElection: LeadershipSelectionSettings = .lowestAddress(minNumberOfMembers: 2)
public var autoLeaderElection: LeadershipSelectionSettings = .lowestReachable(minNumberOfMembers: 2)

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: TLS & Security settings
Expand Down Expand Up @@ -143,7 +143,14 @@ public struct ClusterSettings {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster membership and failure detection

public var downingStrategy: DowningStrategySettings = .none
/// Strategy how members determine if others (or myself) shall be marked as `.down`.
/// This strategy should be set to the same (or compatible) strategy on all members of a cluster to avoid split brain situations.
public var downingStrategy: DowningStrategySettings = .timeout(.default)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured to make the downing on by default after all...

We should soon implement a slightly better one, but for the sake of showing how singletons move around etc I guess let's leave it on...

Opinions @drexin @yim-lee ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured to make the downing on by default after all...

+1


/// When this member node notices it has been marked as `.down` in the membership, it can automatically perform an action.
/// This setting determines which action to take. Generally speaking, the best course of action is to quickly and gracefully
/// shut down the node and process, potentially leaving a higher level orchestrator to replace the node (e.g. k8s starting a new pod for the cluster).
public var onDownAction: OnDownActionStrategySettings = .gracefulShutdown(delay: .seconds(3))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves an ancient ticket #55, I think it is indeed correct to keep the default to kill the actor system as we chatted in the ticket -- sanity check that we still agree with this @drexin ? :)


/// Configures the SWIM failure failure detector.
public var swim: SWIM.Settings = .default
Expand All @@ -168,17 +175,3 @@ public struct ClusterSettings {
self.tls = tls
}
}

public enum DowningStrategySettings {
case none
case timeout(TimeoutBasedDowningStrategySettings)

func make(_ clusterSettings: ClusterSettings) -> DowningStrategy? {
switch self {
case .none:
return nil
case .timeout(let settings):
return TimeoutBasedDowningStrategy(settings, selfNode: clusterSettings.uniqueBindNode)
}
}
}
Loading