Skip to content

Commit 2feeded

Browse files
authored
=swim #397 swim must detect unreachable via other nodes info, and also immediately dead node (#400)
1 parent 40d81d1 commit 2feeded

File tree

16 files changed

+322
-149
lines changed

16 files changed

+322
-149
lines changed

Sources/DistributedActors/Cluster/SWIM/README.md

Lines changed: 0 additions & 3 deletions
This file was deleted.

Sources/DistributedActors/Cluster/SWIM/SWIM.swift

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414

1515
/// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol).
1616
///
17-
/// SWIM serves as a low-level membership and distributed failure detector mechanism.
18-
/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level
19-
/// cluster membership as driven by the `ClusterShell`.
17+
/// SWIM serves as a low-level distributed failure detector mechanism.
18+
/// It also maintains its own membership in order to monitor and select nodes to ping with periodic health checks,
19+
/// however this membership is not directly the same as the high-level membership exposed by the `Cluster`.
2020
///
21-
/// ### Modifications
22-
/// See the documentation of this swim implementation in the reference documentation.
23-
///
24-
/// ### Related Papers
25-
/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf) paper
21+
/// SWIM is first and foremost used to determine if nodes are reachable or not (in SWIM terms if they are `.dead`),
22+
/// however the final decision to mark a node `.dead` is made by the cluster by issuing a `Cluster.MemberStatus.down`
23+
/// (usually in reaction to SWIM informing it about a node being `SWIM.Member.Status
2624
///
25+
/// Cluster members may be discovered though SWIM gossip, yet will be asked to participate in the high-level
26+
/// cluster membership as driven by the `ClusterShell`.
2727
/// ### See Also
2828
/// - SeeAlso: `SWIM.Instance` for a detailed discussion on the implementation.
2929
/// - SeeAlso: `SWIM.Shell` for the interpretation and actor driving the interactions.
@@ -46,7 +46,6 @@ public enum SWIM {
4646
case ping(lastKnownStatus: Status, replyTo: ActorRef<Ack>, payload: Payload)
4747

4848
/// "Ping Request" requests a SWIM probe.
49-
// TODO: target -- node rather than the ref?
5049
case pingReq(target: ActorRef<Message>, lastKnownStatus: Status, replyTo: ActorRef<Ack>, payload: Payload)
5150

5251
/// Extension: Lifeguard, Local Health Aware Probe
@@ -69,7 +68,6 @@ public enum SWIM {
6968
let payload: Payload
7069
}
7170

72-
// TODO: make sure that those are in a "testing" and not just "remote" namespace?
7371
internal struct MembershipState {
7472
let membershipState: [ActorRef<SWIM.Message>: Status]
7573
}
@@ -133,8 +131,9 @@ public enum SWIM {
133131
// MARK: SWIM Member Status
134132

135133
extension SWIM {
136-
/// The SWIM membership status reflects.
134+
/// The SWIM membership status reflects how a node is perceived by the distributed failure detector.
137135
///
136+
/// ### Modification: Unreachable status
138137
/// The `.unreachable` state is set when a classic SWIM implementation would have declared a node `.down`,
139138
/// yet since we allow for the higher level membership to decide when and how to eject members from a cluster,
140139
/// only the `.unreachable` state is set and an `Cluster.ReachabilityChange` cluster event is emitted. In response to this
@@ -248,13 +247,12 @@ extension SWIM.Status {
248247
// MARK: SWIM Member
249248

250249
internal struct SWIMMember: Hashable {
251-
// TODO: would want to swap it around, nodes are members, not actors
252250
var node: UniqueNode {
253251
return self.ref.address.node ?? self.ref._system!.settings.cluster.uniqueBindNode
254252
}
255253

256254
/// Each (SWIM) cluster member is running a `probe` actor which we interact with when gossiping the SWIM messages.
257-
let ref: ActorRef<SWIM.Message> // TODO: better name for `ref` is it a `probeRef` (sounds right?) or `swimmerRef` (meh)?
255+
let ref: ActorRef<SWIM.Message>
258256

259257
var status: SWIM.Status
260258

@@ -268,15 +266,19 @@ internal struct SWIMMember: Hashable {
268266
}
269267

270268
var isAlive: Bool {
271-
return self.status.isAlive
269+
self.status.isAlive
272270
}
273271

274272
var isSuspect: Bool {
275-
return self.status.isSuspect
273+
self.status.isSuspect
274+
}
275+
276+
var isUnreachable: Bool {
277+
self.status.isUnreachable
276278
}
277279

278280
var isDead: Bool {
279-
return self.status.isDead
281+
self.status.isDead
280282
}
281283
}
282284

Sources/DistributedActors/Cluster/SWIM/SWIMInstance+Logging.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ extension SWIM.Instance {
2121
/// While the SWIM.Instance is not meant to be logging by itself, it does offer metadata for loggers to use.
2222
var metadata: Logger.Metadata {
2323
[
24+
"swim/membersToPing": "\(self.membersToPing)",
2425
"swim/protocolPeriod": "\(self.protocolPeriod)",
2526
"swim/incarnation": "\(self.incarnation)",
2627
"swim/memberCount": "\(self.memberCount)",
@@ -64,11 +65,11 @@ extension SWIMShell {
6465
case .receive(nil):
6566
return "RECV"
6667
case .receive(let .some(pinged)):
67-
return "RECV(pinged:\(pinged.path))"
68+
return "RECV(pinged:\(pinged.address))"
6869
case .reply(let to):
69-
return "REPL(to:\(to.path))"
70+
return "REPL(to:\(to.address))"
7071
case .ask(let who):
71-
return "ASK(\(who.path))"
72+
return "ASK(\(who.address))"
7273
}
7374
}
7475
}

Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,31 @@ import Logging
1616

1717
/// # SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol).
1818
///
19-
/// Namespace containing message types used to implement the SWIM protocol.
19+
/// Implementation of the SWIM protocol in abstract terms, see [SWIMShell] for the actor acting upon directives issued by this instance.
2020
///
2121
/// > As you swim lazily through the milieu,
2222
/// > The secrets of the world will infect you.
2323
///
24-
/// - SeeAlso: <a href="https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf">Scalable Weakly-consistent Infection-style Process Group Membership Protocol</a>
25-
/// - SeeAlso: <a href="https://arxiv.org/abs/1707.00788">Lifeguard: Local Health Awareness for More Accurate Failure Detection</a>
24+
/// ### Modifications
25+
/// - Random, stable order members to ping selection: Unlike the completely random selection in the original paper.
26+
///
27+
/// See the reference documentation of this swim implementation in the reference documentation.
28+
///
29+
/// ### Related Papers
30+
/// - SeeAlso: [SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf)
31+
/// - SeeAlso: [Lifeguard: Local Health Awareness for More Accurate Failure Detection](https://arxiv.org/abs/1707.00788)
2632
final class SWIMInstance {
2733
let settings: SWIM.Settings
2834

2935
/// Main members storage, map to values to obtain current members.
3036
private var members: [ActorRef<SWIM.Message>: SWIMMember]
31-
// private var members: [UniqueNode: SWIMMember] // FIXME: this really should talk about nodes, not the members in the keys
3237

38+
/// List of members maintained in random yet stable order, see `addMember` for details.
39+
internal var membersToPing: [SWIMMember]
3340
/// Constantly mutated by `nextMemberToPing` in an effort to keep the order in which we ping nodes evenly distributed.
34-
private var membersToPing: [SWIMMember]
3541
private var _membersToPingIndex: Int = 0
3642
private var membersToPingIndex: Int {
37-
return self._membersToPingIndex
43+
self._membersToPingIndex
3844
}
3945

4046
/// The incarnation number is used to get a sense of ordering of events, so if an `.alive` or `.suspect`
@@ -43,7 +49,7 @@ final class SWIMInstance {
4349
/// be incremented by the respective node itself and will happen if that node receives a `.suspect` for
4450
/// itself, to which it will respond with an `.alive` with the incremented incarnation.
4551
var incarnation: SWIM.Incarnation {
46-
return self._incarnation
52+
self._incarnation
4753
}
4854

4955
private var _incarnation: SWIM.Incarnation = 0
@@ -55,11 +61,10 @@ final class SWIMInstance {
5561
// be declared `.dead` after not receiving an `.alive` for approx. 3 seconds.
5662
private var _protocolPeriod: Int = 0
5763

58-
// We need to store the path to the owning SWIMShell to avoid adding it to the `membersToPing` list
59-
// private var myRemoteAddress: ActorAddress? = nil
64+
// We store the owning SWIMShell ref in order avoid adding it to the `membersToPing` list
6065
private var myShellMyself: ActorRef<SWIM.Message>?
6166
private var myShellAddress: ActorAddress? {
62-
return self.myShellMyself?.address
67+
self.myShellMyself?.address
6368
}
6469

6570
private var myNode: UniqueNode?
@@ -90,7 +95,6 @@ final class SWIMInstance {
9095
return .newerMemberAlreadyPresent(existingMember)
9196
}
9297

93-
// FIXME: store with node as key, not ref
9498
let member = SWIMMember(ref: ref, status: status, protocolPeriod: self.protocolPeriod)
9599
self.members[ref] = member
96100

@@ -114,11 +118,11 @@ final class SWIMInstance {
114118

115119
self.addToGossip(member: member)
116120

117-
return .added
121+
return .added(member)
118122
}
119123

120124
enum AddMemberDirective {
121-
case added
125+
case added(SWIM.Member)
122126
case newerMemberAlreadyPresent(SWIM.Member)
123127
}
124128

@@ -129,16 +133,16 @@ final class SWIMInstance {
129133
///
130134
/// - Note:
131135
/// SWIM 4.3: [...] The failure detection protocol at member works by maintaining a list (intuitively, an array) of the known
132-
/// elements of the current membership list, and select- ing ping targets not randomly from this list,
136+
/// elements of the current membership list, and select-ing ping targets not randomly from this list,
133137
/// but in a round-robin fashion. Instead, a newly joining member is inserted in the membership list at
134138
/// a position that is chosen uniformly at random. On completing a traversal of the entire list,
135139
/// rearranges the membership list to a random reordering.
136140
func nextMemberToPing() -> ActorRef<SWIM.Message>? {
137141
if self.membersToPing.isEmpty {
138142
return nil
139143
}
140-
defer { self.advanceMembersToPingIndex() }
141144

145+
defer { self.advanceMembersToPingIndex() }
142146
return self.membersToPing[self.membersToPingIndex].ref
143147
}
144148

@@ -195,12 +199,25 @@ final class SWIMInstance {
195199
self.removeFromMembersToPing(member)
196200
}
197201

198-
return .applied(previousStatus: previousStatusOption)
202+
return .applied(previousStatus: previousStatusOption, currentStatus: status)
199203
}
200204

201205
enum MarkedDirective: Equatable {
202206
case ignoredDueToOlderStatus(currentStatus: SWIM.Status)
203-
case applied(previousStatus: SWIM.Status?)
207+
case applied(previousStatus: SWIM.Status?, currentStatus: SWIM.Status)
208+
209+
/// True if the directive was `applied` and the from/to statuses differ, meaning that a change notification has issued.
210+
var isEffectiveStatusChange: Bool {
211+
switch self {
212+
case .ignoredDueToOlderStatus:
213+
return false
214+
case .applied(nil, _):
215+
// from no status, to any status is definitely an effective change
216+
return true
217+
case .applied(.some(let previousStatus), let currentStatus):
218+
return previousStatus != currentStatus
219+
}
220+
}
204221
}
205222

206223
func incrementProtocolPeriod() {
@@ -441,11 +458,17 @@ extension SWIM.Instance {
441458
} else {
442459
if self.isMember(member.ref) {
443460
switch self.mark(member.ref, as: member.status) {
444-
case .applied:
445-
if member.status.isDead {
461+
case .applied(_, let currentStatus):
462+
switch currentStatus {
463+
case .unreachable:
464+
return .applied(level: .notice, message: "Member \(member) marked [.unreachable] from incoming gossip")
465+
case .alive:
466+
// TODO: could be another spot that we have to issue a reachable though?
467+
return .ignored
468+
case .suspect:
469+
return .markedSuspect(member: member)
470+
case .dead:
446471
return .confirmedDead(member: member)
447-
} else {
448-
return .applied
449472
}
450473
case .ignoredDueToOlderStatus(let currentStatus):
451474
return .ignored(
@@ -454,9 +477,13 @@ extension SWIM.Instance {
454477
)
455478
}
456479
} else if let remoteMemberNode = member.ref.address.node {
457-
// TODO: store that we're now handshaking with it already?
458-
return .connect(node: remoteMemberNode, onceConnected: { _ in
459-
self.addMember(member.ref, status: member.status)
480+
return .connect(node: remoteMemberNode, onceConnected: {
481+
switch $0 {
482+
case .success(let uniqueNode):
483+
self.addMember(member.ref, status: member.status)
484+
case .failure(let error):
485+
self.addMember(member.ref, status: .suspect(incarnation: 0)) // connecting failed, so we immediately mark it as suspect (!)
486+
}
460487
})
461488
} else {
462489
return .ignored(
@@ -482,7 +509,8 @@ extension SWIM.Instance {
482509
/// and do not have a connection to it either (e.g. we joined only seed nodes, and more nodes joined them later
483510
/// we could get information through the seed nodes about the new members; but we still have never talked to them,
484511
/// thus we need to ensure we have a connection to them, before we consider adding them to the membership).
485-
case connect(node: UniqueNode, onceConnected: (UniqueNode) -> Void)
512+
case connect(node: UniqueNode, onceConnected: (Result<UniqueNode, Error>) -> Void)
513+
case markedSuspect(member: SWIM.Member)
486514
/// Meaning the node is now marked `DEAD`.
487515
/// SWIM will continue to gossip about the dead node for a while.
488516
/// We should also notify the high-level membership that the node shall be considered `DOWN`.

Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@ public struct SWIMSettings {
2222
.init()
2323
}
2424

25-
/// Optional "SWIM instance name" to be included in log statements,
26-
/// useful when multiple instances of SWIM are run on the same process (e.g. for debugging).
27-
public var name: String?
28-
2925
// var timeSource: TimeSource // TODO would be nice?
3026

3127
public var gossip: SWIMGossipSettings = .default
3228

3329
public var failureDetector: SWIMFailureDetectorSettings = .default
3430

31+
/// Optional "SWIM instance name" to be included in log statements,
32+
/// useful when multiple instances of SWIM are run on the same node (e.g. for debugging).
33+
internal var name: String?
34+
3535
/// When enabled traces _all_ incoming SWIM protocol communication (remote messages).
3636
/// These logs will contain SWIM.Instance metadata, as offered by `SWIM.Instance.metadata`.
3737
/// All logs will be prefixed using `[tracelog:SWIM]`, for easier grepping and inspecting only logs related to the SWIM instance.
@@ -48,31 +48,26 @@ extension SWIM {
4848
}
4949

5050
// ==== ----------------------------------------------------------------------------------------------------------------
51-
// MARK: Gossip Settings
51+
// MARK: SWIM Gossip Settings
5252

5353
public struct SWIMGossipSettings {
5454
public static var `default`: SWIMGossipSettings {
55-
return .init()
55+
.init()
5656
}
5757

58-
/// Interval at which gossip messages should be issued.
59-
/// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout?
60-
public var probeInterval: TimeAmount = .seconds(1)
61-
62-
// FIXME: investigate size of messages and find good default
63-
//
64-
// max number of messages included in any gossip payload
58+
// TODO: investigate size of messages and find good default
59+
/// Max number of messages included in any gossip payload
6560
public var maxNumberOfMessages: Int = 20
6661

6762
public var maxGossipCountPerMessage: Int = 6
6863
}
6964

7065
// ==== ----------------------------------------------------------------------------------------------------------------
71-
// MARK: FailureDetector Settings
66+
// MARK: SWIM FailureDetector Settings
7267

7368
public struct SWIMFailureDetectorSettings {
7469
public static var `default`: SWIMFailureDetectorSettings {
75-
return .init()
70+
.init()
7671
}
7772

7873
/// Number of indirect probes that will be issued once a direct ping probe has failed to reply in time with an ack.
@@ -96,6 +91,15 @@ public struct SWIMFailureDetectorSettings {
9691
public var suspicionTimeoutPeriodsMax: Int = 10
9792
// public var suspicionTimeoutPeriodsMin: Int = 10 // FIXME: this is once we have LHA, Local Health Aware Suspicion
9893

94+
/// Interval at which gossip messages should be issued.
95+
/// Every `interval` a `fanout` number of gossip messages will be sent. // TODO which fanout?
9996
public var probeInterval: TimeAmount = .seconds(1)
97+
98+
/// Time amount after which a sent ping without ack response is considered timed-out.
99+
/// This drives how a node becomes a suspect, by missing such ping/ack rounds.
100+
///
101+
/// Note that after an initial ping/ack timeout, secondary indirect probes are issued,
102+
/// and only after exceeding `suspicionTimeoutPeriodsMax` shall the node be declared as `.unreachable`,
103+
/// which results in an `Cluster.MemberReachabilityChange` `Cluster.Event` which downing strategies may act upon.
100104
public var pingTimeout: TimeAmount = .milliseconds(300)
101105
}

0 commit comments

Comments
 (0)