Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Samples/Sources/SampleDiningPhilosophers/Fork.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Logging

distributed actor Fork: CustomStringConvertible {
private lazy var log: Logger = {
var l = Logger(actor: self)
var l = Logger(clusterActor: self)
l[metadataKey: "name"] = "\(self.name)"
return l
}()
Expand Down
18 changes: 15 additions & 3 deletions Sources/DistributedCluster/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,24 @@ internal final class LoggingContext {
/// Specialized `Logger` factory, populating the logger with metadata about its "owner" actor (or system),
/// such as it's path or node on which it resides.
///
/// The preferred way of obtaining a logger for an actor or system is `context.log` or `system.log`, rather than creating new ones.
/// The preferred way of obtaining a logger for a cluster actor is `Logger(clusterActor:)` and as follows for the ClusterSystem: `system.log`, rather than creating new ones.
///
/// These factories use the configured `baseLogger` of the cluster system which allows for easy testing, mocking, and configuring all loggers used within a system.
extension Logger {
/// Create a logger specific to this actor.
/// See ``Logger(clusterActor:)``.
@available(*, deprecated, renamed: "Logger(clusterActor:)")
public init<Act: DistributedActor>(actor: Act) where Act.ActorSystem == ClusterSystem {
self = Logger(clusterActor: actor)
}

/// Create a logger specific to this cluster actor, based off the `ClusterSystem.settings.logging.baseLogger`.
///
/// The logger will include `actor/id` for easily identifying which actor is logging.
///
/// The logger does NOT retain the actor, only its id.
public init<Act: DistributedActor>(clusterActor actor: Act) where Act.ActorSystem == ClusterSystem {
var log = actor.actorSystem.settings.logging.baseLogger
log[metadataKey: "actor/path"] = "\(actor.id.path)"
log[metadataKey: "actor/path"] = "\(actor.id.path)" // TODO: remove paths or make them optional here?
log[metadataKey: "actor/id"] = "\(actor.id)"
self = log
}
Expand Down
127 changes: 123 additions & 4 deletions Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public struct ClusterControl {
}

private let _membershipSnapshotHolder: MembershipHolder

private actor MembershipHolder {
var membership: Cluster.Membership

Expand Down Expand Up @@ -91,6 +92,24 @@ public struct ClusterControl {
self.node.endpoint
}

/// The member representing _this_ node in the cluster.
///
/// Its ``MemberStatus`` is based on the current membership snapshot, so if changes are happening to the membership
/// beware that this view may be out of date and it may be useful to observe a stream of cluster events if you
/// intend to use the member long-term. Specifically, it is not advised to store a member for long term use purposes,
/// exactly because its status may be changing over time -- subscribe to the cluster event stream instead, or always
/// query the ``membershipSnapshot`` instead in such situations.
public var member: Cluster.Member {
get async {
let membership = await membershipSnapshot
guard let member = membership.member(self.node) else {
fatalError("Unexpected: self.membership must always contain member for the current node, but didn't! Node: \(self.node), membership: \(membership)")
}

return member
}
}

/// Instructs the cluster to join the actor system located listening on the passed in host-port pair.
///
/// There is no specific need to "wait until joined" before one can attempt to send to references located on the cluster member,
Expand Down Expand Up @@ -181,7 +200,7 @@ public struct ClusterControl {
/// Wait, within the given duration, until the passed in node has joined the cluster and become ``Cluster/MemberStatus/up``.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - endpoint: The endpoint to be joined by this system.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node.
Expand Down Expand Up @@ -212,7 +231,7 @@ public struct ClusterControl {
///
/// - Parameters
/// - nodes: The nodes to be joined by this system.
/// - status: The minimum expected member status.
/// - atLeastStatus: The minimum expected member status.
/// - within: Duration to wait for.
public func waitFor(_ nodes: some Collection<Cluster.Node>, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
Expand Down Expand Up @@ -263,7 +282,7 @@ public struct ClusterControl {
/// Wait, within the given duration, for this actor system to be a member of the node's cluster and have the specified status.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - endpoint: The endpoint to be joined by this system.
/// - status: The expected member status.
/// - within: Duration to wait for.
///
Expand Down Expand Up @@ -321,9 +340,109 @@ public struct ClusterControl {
}
}

/// Perform a **local change** to the membership such that the leader of the cluster is assumed to be `member`.
///
/// In order to implement various leader election mechanisms, users are free to choose whichever mechanism
/// suits them: rely on an external system, implement a leader election
///
/// > NOTE: This change is NOT applied to other nodes via gossip./
///
/// If there are any pending leader actions stashed on the new leader (such as downing nodes),
/// they will be executed as soon as it becomes the leader.
///
/// If the passed in event applied to the current membership is an effective change,
/// the change will be published using the `system.cluster.events`. On the contrary, if the change either was
/// already caused independently, or of the target leader already is the leader, no new event will be emitted.
///
/// - Parameter member: the change to apply to the cluster.
public func assumeLeader(_ member: Cluster.Member) {
// old leader does not matter for applying this change, we assume the new leader based on external decisions.
guard let change = Cluster.LeadershipChange(oldLeader: nil, newLeader: member) else {
fatalError("Impossible that leadership change would be ineffective when moving from 'nil' to: \(member)")
}
self.ref.tell(.requestMembershipChange(.leadershipChange(change)))
}

/// Wait, within the given duration, for a leader to be found in the cluster membership and have **at least** the specified status.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - atLeastStatus: The minimum expected member status.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node with the minimum expected status.
/// If the expected status is at least `.down` or `.removed`, and either a tombstone exists for the node or the associated
/// membership is not found, the `Cluster.Member` returned would have `.removed` status and *unreachable*.
@discardableResult
public func waitForLeader(atLeast atLeastStatus: Cluster.MemberStatus, within: Duration, file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member? {
try await self.waitForMembershipEventually(Cluster.Member?.self, within: within, file: file, line: line) { membership in
guard let foundLeader = membership.leader else {
if atLeastStatus == .down || atLeastStatus == .removed {
return nil
}
throw Cluster.MembershipError(.notFoundAny(endpoint, in: membership), file: file, line: line)
}

guard foundLeader.status >= atLeastStatus else {
throw Cluster.MembershipError(.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundLeader), file: file, line: line)
}
return foundLeader
}
}

/// Wait, within the given duration, for this node to become a leader of the cluster.
@discardableResult
public func waitToBecomeLeader(file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member {
for await event in self.events {
switch event {
case .leadershipChange(let leaderChanged):
guard let newLeader = leaderChanged.newLeader else {
continue // keep waiting
}
guard newLeader.node == self.node else {
continue // keep waiting, someone else has become leader
}
return newLeader // yay, we have become the leader!

case .membershipChange(let change):
guard change.member.node == self.node else {
// change about some other node, we're not concerned about those here
continue
}
if change.member.status.isDown || change.member.status.isRemoved {
// If we became down, we'll never become leader
throw Cluster.MembershipError(.statusRequirementNotMet(expected: .joining, found: change.member))
}
continue

case .snapshot(let snapshot):
guard let member = snapshot.member(self.node) else {
continue
}
if member.status.isDown || member.status.isRemoved {
// If we became down, we'll never become leader
throw Cluster.MembershipError(.statusRequirementNotMet(expected: .joining, found: member))
}
continue

default:
continue
}
}

/// We broke out of looking at the events before becoming the leader, throe that we didn't find "it";
/// This likely would happen if the task running the wait method would have been cancelled.
if Task.isCancelled {
throw CancellationError()
}

throw await Cluster.MembershipError(.notFound(self.node, in: self.membershipSnapshot))
}

private func waitForMembershipEventually<T>(_: T.Type = T.self,
within: Duration,
interval: Duration = .milliseconds(100),
file: String = #fileID, line: UInt = #line,
_ block: (Cluster.Membership) async throws -> T) async throws -> T
{
let deadline = ContinuousClock.Instant.fromNow(within)
Expand All @@ -340,6 +459,6 @@ public struct ClusterControl {
}
}

throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError))
throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError), file: file, line: line)
}
}
13 changes: 11 additions & 2 deletions Sources/DistributedCluster/Cluster/ClusterEventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,16 @@ public struct ClusterEventStream: AsyncSequence {
}
}

func publish(_ event: Cluster.Event, file: String = #filePath, line: UInt = #line) async {
/// Emit a cluster event.
///
/// Emitting an event only applies locally, and therefore should really only be used by the cluster shell itself
/// which is the source of all such events.
///
/// - Parameters:
/// - event: the event to be emitted
/// - file: source file location
/// - line: source line location
internal func publish(_ event: Cluster.Event, file: String = #filePath, line: UInt = #line) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
Expand Down Expand Up @@ -147,7 +156,7 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {
private var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
private var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]

private lazy var log = Logger(actor: self)
private lazy var log = Logger(clusterActor: self)

internal init(actorSystem: ActorSystem) {
self.actorSystem = actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ internal distributed actor DowningStrategyShell {

let strategy: DowningStrategy

private lazy var log = Logger(actor: self)
private lazy var log = Logger(clusterActor: self)

/// `Task` for subscribing to cluster events.
private var eventsListeningTask: Task<Void, Error>?
Expand Down
21 changes: 13 additions & 8 deletions Sources/DistributedCluster/Cluster/Leadership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ import NIO // Future
/// If a new member is selected as leader, a ``Cluster/Event`` carrying ``Cluster/LeadershipChange`` will be emitted.
/// Other actors may subscribe to `ClusterSystem.cluster.events` in order to receive and react to such changes,
/// e.g. if an actor should only perform its duties if it is residing on the current leader node.
public protocol LeaderElection {
public protocol LegacyLeaderElection {
/// Select a member to become a leader out of the existing `Membership`.
///
/// Decisions about electing/selecting a leader may be performed asynchronously.
mutating func runElection(context: LeaderElectionContext, membership: Cluster.Membership) -> LeaderElectionResult

}

public struct LeaderElectionContext {
Expand All @@ -72,6 +73,7 @@ public struct LeaderElectionContext {
/// actor coordination, the result of such election is going to be provided asynchronously.
///
/// A change in leadership will result in a `Cluster.LeadershipChange` event being emitted in the system's cluster event stream.
@available(*, deprecated, message: "This API predates async/await and should not be used in new code. Call ")
public struct LeaderElectionResult: _AsyncResult {
public typealias Value = Cluster.LeadershipChange?
let future: EventLoopFuture<Cluster.LeadershipChange?>
Expand Down Expand Up @@ -102,9 +104,9 @@ extension Leadership {
static let naming: _ActorNaming = "leadership"

private var membership: Cluster.Membership // FIXME: we need to ensure the membership is always up to date -- we need the initial snapshot or a diff from a zero state etc.
private var election: LeaderElection
private var election: LegacyLeaderElection

init(_ election: LeaderElection) {
init(_ election: LegacyLeaderElection) {
self.election = election
self.membership = .empty
}
Expand Down Expand Up @@ -162,7 +164,10 @@ extension Leadership {
context.log.trace("The leadership change that was decided on by \(self.election) results in no change from current leadership state.")
return self.ready
}
context.system.cluster.ref.tell(.requestMembershipChange(.leadershipChange(changed)))
guard let leader = changed.newLeader else {
return self.ready
}
context.system.cluster.assumeLeader(leader)
return self.ready

case .success(.none):
Expand Down Expand Up @@ -223,7 +228,7 @@ extension Leadership {
/// fulfilling this role whenever the minimum number of nodes exist. This may be useful when operation would
/// potentially be unsafe given less than `minimumNrOfMembers` nodes.
///
public struct LowestReachableMember: LeaderElection {
public struct LowestReachableMember: LegacyLeaderElection {
// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
// guarantees, and you should consider using another scheme or consensus based modes.
let minimumNumberOfMembersToDecide: Int
Expand Down Expand Up @@ -260,9 +265,9 @@ extension Leadership {
// Clear current leader and trigger `Cluster.LeadershipChange`
let change = try! membership.applyLeadershipChange(to: nil) // try!-safe, because changing leader to nil is safe
context.log.trace("Removing leader [\(currentLeader)]")
return .init(context.loop.next().makeSucceededFuture(change))
return LeaderElectionResult(context.loop.next().makeSucceededFuture(change))
} else {
return .init(context.loop.next().makeSucceededFuture(nil))
return LeaderElectionResult(context.loop.next().makeSucceededFuture(nil))
}
}

Expand Down Expand Up @@ -327,7 +332,7 @@ extension ClusterSystemSettings {

private let underlying: _LeadershipSelectionSettings

func make(_: ClusterSystemSettings) -> LeaderElection? {
func make(_: ClusterSystemSettings) -> LegacyLeaderElection? {
switch self.underlying {
case .none:
return nil
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin
}

private lazy var log: Logger = {
var log = Logger(actor: self)
var log = Logger(clusterActor: self)
log.logLevel = self.settings.logger.logLevel
return log
}()
Expand Down
3 changes: 1 addition & 2 deletions Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
self.metrics = ClusterSystemMetrics(settings.metrics)

self._receptionistRef = ManagedAtomicLazyReference()
// self._receptionistStore = ManagedAtomicLazyReference()
self._serialization = ManagedAtomicLazyReference()
self._clusterStore = ManagedAtomicLazyReference()
self._clusterControlStore = ManagedAtomicLazyReference()
Expand Down Expand Up @@ -1559,7 +1558,7 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH
directReturnContinuation.resume(returning: ())

case .remoteCall(let system, let callID, let channel, let recipient):
system.log.debug("Result handler, onReturnVoid", metadata: [
system.log.trace("Result handler, onReturnVoid", metadata: [
"call/id": "\(callID)",
])

Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/Docs.docc/Observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The cluster system offers a number of built-in observability capabilities about

## Logging

TODO: Explain `Logger(actor: self)` pattern
TODO: Explain `Logger(clusterActor: self)` pattern

## Metrics

Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/Gossip/Gossiper+Shell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ internal final class GossipShell<Gossip: Codable, Acknowledgement: Codable> {
context.log.trace("Received gossip [\(identifier.gossipIdentifier)]", metadata: [
"gossip/identity": "\(identifier.gossipIdentifier)",
"gossip/origin": "\(origin.id)",
"gossip/incoming": Logger.MetadataValue.pretty(payload),
"gossip/incoming": "\(pretty: payload)",
])

let logic = self.getEnsureLogic(context, identifier: identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
/// `Task` for subscribing to cluster events
private var clusterEventsSubscribeTask: Task<Void, Error>?

private lazy var log = Logger(actor: self)
private lazy var log = Logger(clusterActor: self)

init(
settings: ClusterSingletonSettings,
Expand Down
Loading