Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Sources/DistributedActors/Cluster/SWIM/README.md

This file was deleted.

34 changes: 18 additions & 16 deletions Sources/DistributedActors/Cluster/SWIM/SWIM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

/// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol).
///
/// SWIM serves as a low-level membership and distributed failure detector mechanism.
/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level
/// cluster membership as driven by the `ClusterShell`.
/// SWIM serves as a low-level distributed failure detector mechanism.
/// It also maintains its own membership in order to monitor and select nodes to ping with periodic health checks,
/// however this membership is not directly the same as the high-level membership exposed by the `Cluster`.
///
/// ### Modifications
/// See the documentation of this swim implementation in the reference documentation.
///
/// ### Related Papers
/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf) paper
/// SWIM is first and foremost used to determine if nodes are reachable or not (in SWIM terms if they are `.dead`),
/// however the final decision to mark a node `.dead` is made by the cluster by issuing a `Cluster.MemberStatus.down`
/// (usually in reaction to SWIM informing it about a node being `SWIM.Member.Status
///
/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level
/// cluster membership as driven by the `ClusterShell`.
/// ### See Also
/// - SeeAlso: `SWIM.Instance` for a detailed discussion on the implementation.
/// - SeeAlso: `SWIM.Shell` for the interpretation and actor driving the interactions.
Expand All @@ -46,7 +46,6 @@ public enum SWIM {
case ping(lastKnownStatus: Status, replyTo: ActorRef<Ack>, payload: Payload)

/// "Ping Request" requests a SWIM probe.
// TODO: target -- node rather than the ref?
case pingReq(target: ActorRef<Message>, lastKnownStatus: Status, replyTo: ActorRef<Ack>, payload: Payload)

/// Extension: Lifeguard, Local Health Aware Probe
Expand All @@ -69,7 +68,6 @@ public enum SWIM {
let payload: Payload
}

// TODO: make sure that those are in a "testing" and not just "remote" namespace?
internal struct MembershipState {
let membershipState: [ActorRef<SWIM.Message>: Status]
}
Expand Down Expand Up @@ -133,8 +131,9 @@ public enum SWIM {
// MARK: SWIM Member Status

extension SWIM {
/// The SWIM membership status reflects.
/// The SWIM membership status reflects how a node is perceived by the distributed failure detector.
///
/// ### Modification: Unreachable status
/// The `.unreachable` state is set when a classic SWIM implementation would have declared a node `.down`,
/// yet since we allow for the higher level membership to decide when and how to eject members from a cluster,
/// only the `.unreachable` state is set and an `Cluster.ReachabilityChange` cluster event is emitted. In response to this
Expand Down Expand Up @@ -248,13 +247,12 @@ extension SWIM.Status {
// MARK: SWIM Member

internal struct SWIMMember: Hashable {
// TODO: would want to swap it around, nodes are members, not actors
var node: UniqueNode {
return self.ref.address.node ?? self.ref._system!.settings.cluster.uniqueBindNode
}

/// Each (SWIM) cluster member is running a `probe` actor which we interact with when gossiping the SWIM messages.
let ref: ActorRef<SWIM.Message> // TODO: better name for `ref` is it a `probeRef` (sounds right?) or `swimmerRef` (meh)?
let ref: ActorRef<SWIM.Message>

var status: SWIM.Status

Expand All @@ -268,15 +266,19 @@ internal struct SWIMMember: Hashable {
}

var isAlive: Bool {
return self.status.isAlive
self.status.isAlive
}

var isSuspect: Bool {
return self.status.isSuspect
self.status.isSuspect
}

var isUnreachable: Bool {
self.status.isUnreachable
}

var isDead: Bool {
return self.status.isDead
self.status.isDead
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extension SWIM.Instance {
/// While the SWIM.Instance is not meant to be logging by itself, it does offer metadata for loggers to use.
var metadata: Logger.Metadata {
[
"swim/membersToPing": "\(self.membersToPing)",
"swim/protocolPeriod": "\(self.protocolPeriod)",
"swim/incarnation": "\(self.incarnation)",
"swim/memberCount": "\(self.memberCount)",
Expand Down Expand Up @@ -64,11 +65,11 @@ extension SWIMShell {
case .receive(nil):
return "RECV"
case .receive(let .some(pinged)):
return "RECV(pinged:\(pinged.path))"
return "RECV(pinged:\(pinged.address))"
case .reply(let to):
return "REPL(to:\(to.path))"
return "REPL(to:\(to.address))"
case .ask(let who):
return "ASK(\(who.path))"
return "ASK(\(who.address))"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracelog saved the day here ❤️ was useful to be able to see all messages in/out

}
}
}
Expand Down
78 changes: 53 additions & 25 deletions Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@ import Logging

/// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol).
///
/// Namespace containing message types used to implement the SWIM protocol.
/// Implementation of the SWIM protocol in abstract terms, see [SWIMShell] for the actor acting upon directives issued by this instance.
///
/// > As you swim lazily through the milieu,
/// > The secrets of the world will infect you.
///
/// - SeeAlso: <a href="https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf">Scalable Weakly-consistent Infection-style Process Group Membership Protocol</a>
/// - SeeAlso: <a href="https://arxiv.org/abs/1707.00788">Lifeguard: Local Health Awareness for More Accurate Failure Detection</a>
/// ### Modifications
/// - Random, stable order members to ping selection: Unlike the completely random selection in the original paper.
///
/// See the reference documentation of this swim implementation in the reference documentation.
///
/// ### Related Papers
/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf)
/// - SeeAlso: [Lifeguard: Local Health Awareness for More Accurate Failure Detection](https://arxiv.org/abs/1707.00788)
final class SWIMInstance {
let settings: SWIM.Settings

/// Main members storage, map to values to obtain current members.
private var members: [ActorRef<SWIM.Message>: SWIMMember]
// private var members: [UniqueNode: SWIMMember] // FIXME: this really should talk about nodes, not the members in the keys

/// List of members maintained in random yet stable order, see `addMember` for details.
internal var membersToPing: [SWIMMember]
/// Constantly mutated by `nextMemberToPing` in an effort to keep the order in which we ping nodes evenly distributed.
private var membersToPing: [SWIMMember]
private var _membersToPingIndex: Int = 0
private var membersToPingIndex: Int {
return self._membersToPingIndex
self._membersToPingIndex
}

/// The incarnation number is used to get a sense of ordering of events, so if an `.alive` or `.suspect`
Expand All @@ -43,7 +49,7 @@ final class SWIMInstance {
/// be incremented by the respective node itself and will happen if that node receives a `.suspect` for
/// itself, to which it will respond with an `.alive` with the incremented incarnation.
var incarnation: SWIM.Incarnation {
return self._incarnation
self._incarnation
}

private var _incarnation: SWIM.Incarnation = 0
Expand All @@ -55,11 +61,10 @@ final class SWIMInstance {
// be declared `.dead` after not receiving an `.alive` for approx. 3 seconds.
private var _protocolPeriod: Int = 0

// We need to store the path to the owning SWIMShell to avoid adding it to the `membersToPing` list
// private var myRemoteAddress: ActorAddress? = nil
// We store the owning SWIMShell ref in order avoid adding it to the `membersToPing` list
private var myShellMyself: ActorRef<SWIM.Message>?
private var myShellAddress: ActorAddress? {
return self.myShellMyself?.address
self.myShellMyself?.address
}

private var myNode: UniqueNode?
Expand Down Expand Up @@ -90,7 +95,6 @@ final class SWIMInstance {
return .newerMemberAlreadyPresent(existingMember)
}

// FIXME: store with node as key, not ref
let member = SWIMMember(ref: ref, status: status, protocolPeriod: self.protocolPeriod)
self.members[ref] = member

Expand All @@ -114,11 +118,11 @@ final class SWIMInstance {

self.addToGossip(member: member)

return .added
return .added(member)
}

enum AddMemberDirective {
case added
case added(SWIM.Member)
case newerMemberAlreadyPresent(SWIM.Member)
}

Expand All @@ -129,16 +133,16 @@ final class SWIMInstance {
///
/// - Note:
/// SWIM 4.3: [...] The failure detection protocol at member works by maintaining a list (intuitively, an array) of the known
/// elements of the current membership list, and select- ing ping targets not randomly from this list,
/// elements of the current membership list, and select-ing ping targets not randomly from this list,
/// but in a round-robin fashion. Instead, a newly joining member is inserted in the membership list at
/// a position that is chosen uniformly at random. On completing a traversal of the entire list,
/// rearranges the membership list to a random reordering.
func nextMemberToPing() -> ActorRef<SWIM.Message>? {
if self.membersToPing.isEmpty {
return nil
}
defer { self.advanceMembersToPingIndex() }

defer { self.advanceMembersToPingIndex() }
return self.membersToPing[self.membersToPingIndex].ref
}

Expand Down Expand Up @@ -195,12 +199,25 @@ final class SWIMInstance {
self.removeFromMembersToPing(member)
}

return .applied(previousStatus: previousStatusOption)
return .applied(previousStatus: previousStatusOption, currentStatus: status)
}

enum MarkedDirective: Equatable {
case ignoredDueToOlderStatus(currentStatus: SWIM.Status)
case applied(previousStatus: SWIM.Status?)
case applied(previousStatus: SWIM.Status?, currentStatus: SWIM.Status)

/// True if the directive was `applied` and the from/to statuses differ, meaning that a change notification has issued.
var isEffectiveStatusChange: Bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preparing for #401 already

switch self {
case .ignoredDueToOlderStatus:
return false
case .applied(nil, _):
// from no status, to any status is definitely an effective change
return true
case .applied(.some(let previousStatus), let currentStatus):
return previousStatus != currentStatus
}
}
}

func incrementProtocolPeriod() {
Expand Down Expand Up @@ -441,11 +458,17 @@ extension SWIM.Instance {
} else {
if self.isMember(member.ref) {
switch self.mark(member.ref, as: member.status) {
case .applied:
if member.status.isDead {
case .applied(_, let currentStatus):
switch currentStatus {
case .unreachable:
return .applied(level: .notice, message: "Member \(member) marked [.unreachable] from incoming gossip")
case .alive:
// TODO: could be another spot that we have to issue a reachable though?
return .ignored
case .suspect:
return .markedSuspect(member: member)
case .dead:
return .confirmedDead(member: member)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dance to be reformulated as "change (from to)" so we can more surely emit events to the cluster (about reachability)

} else {
return .applied
}
case .ignoredDueToOlderStatus(let currentStatus):
return .ignored(
Expand All @@ -454,9 +477,13 @@ extension SWIM.Instance {
)
}
} else if let remoteMemberNode = member.ref.address.node {
// TODO: store that we're now handshaking with it already?
return .connect(node: remoteMemberNode, onceConnected: { _ in
self.addMember(member.ref, status: member.status)
return .connect(node: remoteMemberNode, onceConnected: {
switch $0 {
case .success(let uniqueNode):
self.addMember(member.ref, status: member.status)
case .failure(let error):
self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!)
}
})
} else {
return .ignored(
Expand All @@ -482,7 +509,8 @@ extension SWIM.Instance {
/// and do not have a connection to it either (e.g. we joined only seed nodes, and more nodes joined them later
/// we could get information through the seed nodes about the new members; but we still have never talked to them,
/// thus we need to ensure we have a connection to them, before we consider adding them to the membership).
case connect(node: UniqueNode, onceConnected: (UniqueNode) -> Void)
case connect(node: UniqueNode, onceConnected: (Result<UniqueNode, Error>) -> Void)
case markedSuspect(member: SWIM.Member)
/// Meaning the node is now marked `DEAD`.
/// SWIM will continue to gossip about the dead node for a while.
/// We should also notify the high-level membership that the node shall be considered `DOWN`.
Expand Down
34 changes: 19 additions & 15 deletions Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ public struct SWIMSettings {
.init()
}

/// Optional "SWIM instance name" to be included in log statements,
/// useful when multiple instances of SWIM are run on the same process (e.g. for debugging).
public var name: String?

// var timeSource: TimeSource // TODO would be nice?

public var gossip: SWIMGossipSettings = .default

public var failureDetector: SWIMFailureDetectorSettings = .default

/// Optional "SWIM instance name" to be included in log statements,
/// useful when multiple instances of SWIM are run on the same node (e.g. for debugging).
internal var name: String?

/// When enabled traces _all_ incoming SWIM protocol communication (remote messages).
/// These logs will contain SWIM.Instance metadata, as offered by `SWIM.Instance.metadata`.
/// All logs will be prefixed using `[tracelog:SWIM]`, for easier grepping and inspecting only logs related to the SWIM instance.
Expand All @@ -48,31 +48,26 @@ extension SWIM {
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Gossip Settings
// MARK: SWIM Gossip Settings

public struct SWIMGossipSettings {
public static var `default`: SWIMGossipSettings {
return .init()
.init()
}

/// Interval at which gossip messages should be issued.
/// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout?
public var probeInterval: TimeAmount = .seconds(1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this was weird, was used but shouldnt be -- we had probeInterval both in swim.gossip and in swim.failureDetector, so some settings we applied in tests never were effective since they set "the wrong one". Now that value is only in the failure detector


// FIXME: investigate size of messages and find good default
//
// max number of messages included in any gossip payload
// TODO: investigate size of messages and find good default
/// Max number of messages included in any gossip payload
public var maxNumberOfMessages: Int = 20

public var maxGossipCountPerMessage: Int = 6
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: FailureDetector Settings
// MARK: SWIM FailureDetector Settings

public struct SWIMFailureDetectorSettings {
public static var `default`: SWIMFailureDetectorSettings {
return .init()
.init()
}

/// Number of indirect probes that will be issued once a direct ping probe has failed to reply in time with an ack.
Expand All @@ -96,6 +91,15 @@ public struct SWIMFailureDetectorSettings {
public var suspicionTimeoutPeriodsMax: Int = 10
// public var suspicionTimeoutPeriodsMin: Int = 10 // FIXME: this is once we have LHA, Local Health Aware Suspicion

/// Interval at which gossip messages should be issued.
/// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout?
public var probeInterval: TimeAmount = .seconds(1)

/// Time amount after which a sent ping without ack response is considered timed-out.
/// This drives how a node becomes a suspect, by missing such ping/ack rounds.
///
/// Note that after an initial ping/ack timeout, secondary indirect probes are issued,
/// and only after exceeding `suspicionTimeoutPeriodsMax` shall the node be declared as `.unreachable`,
/// which results in an `Cluster.MemberReachabilityChange` `Cluster.Event` which downing strategies may act upon.
public var pingTimeout: TimeAmount = .milliseconds(300)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: computed property so that multiplies the two so we can easily print "at earliest, we'll notice an unreachable node in N seconds" etc

Loading