Skip to content

Commit 9c05a6e

Browse files
committed
Hardening process isolated, node watcher uses event to detect down
1 parent 35df486 commit 9c05a6e

File tree

13 files changed

+110
-111
lines changed

13 files changed

+110
-111
lines changed

IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_escalatingWorkers/main.swift

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,37 @@ let isolated = ProcessIsolated { boot in
3030

3131
pprint("Started process: \(getpid()) with roles: \(isolated.roles)")
3232

33-
try isolated.run(on: .master) {
33+
struct OnPurposeBoom: Error {}
34+
35+
isolated.run(on: .master) {
3436
isolated.spawnServantProcess(supervision: .restart(atMost: 1, within: nil), args: ["fatalError"])
37+
isolated.spawnServantProcess(supervision: .restart(atMost: 1, within: nil), args: ["escalateError"])
3538
}
3639

3740
try isolated.run(on: .servant) {
38-
isolated.system.log.info("ISOLATED RUNNING")
41+
isolated.system.log.info("ISOLATED RUNNING: \(CommandLine.arguments)")
3942

40-
// TODO: system should be configured to terminate HARD when a failure reaches the guardian
43+
// TODO: assert command line arguments are the expected ones
4144

42-
let _: ActorRef<String> = try isolated.system.spawn("failing",
45+
_ = try isolated.system.spawn("failed", of: String.self,
4346
props: Props().supervision(strategy: .escalate),
4447
.setup { context in
45-
context.log.info("Spawned \(context.path) on servant node, it will fault with a [Boom].")
48+
context.log.info("Spawned \(context.path) on servant node it will fail soon...")
4649
context.timers.startSingle(key: "explode", message: "Boom", delay: .seconds(1))
4750

4851
return .receiveMessage { message in
49-
fatalError("Faulting on purpose: \(message)")
50-
return .stop
52+
if CommandLine.arguments.contains("fatalError") {
53+
context.log.error("Time to crash with: fatalError")
54+
// crashes process since we do not isolate faults
55+
fatalError("FATAL ERROR ON PURPOSE")
56+
} else if CommandLine.arguments.contains("escalateError") {
57+
context.log.error("Time to crash with: throwing an error, escalated to top level")
58+
// since we .escalate and are a top-level actor, this will cause the process to die as well
59+
throw OnPurposeBoom()
60+
} else {
61+
context.log.error("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly.")
62+
fatalError("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly.")
63+
}
5164
}
5265
})
5366
}

IntegrationTests/tests_02_process_isolated/test_04_failing_workers_to_cause_servant_restart.sh

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ swift build # synchronously ensure built
3939

4040
swift run ${app_name} &
4141

42-
await_n_processes "$app_name" 2
42+
await_n_processes "$app_name" 3
4343

4444
pid_master=$(ps aux | grep ${app_name} | grep -v grep | grep -v servant | awk '{ print $2 }')
4545
pid_servant=$(ps aux | grep ${app_name} | grep -v grep | grep servant | head -n1 | awk '{ print $2 }')
@@ -53,12 +53,8 @@ echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
5353

5454
sleep 3 # TODO rather, sleep until another proc replaces the servant automatically
5555

56-
echo '~~~~~~~~~~~~~ KILLED KILLED KILLED KILLED KILLED KILLED ~~~~~~~~~~~~~~~~~~~~~~~~~~~'
57-
ps aux | grep ${app_name}
58-
echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
59-
6056
# the 1 servant should die, but be restarted so we'll be back at two processes
61-
await_n_processes "$app_name" 2
57+
await_n_processes "$app_name" 3
6258

6359
if [[ $(ps aux | awk '{print $2}' | grep ${pid_servant} | grep -v 'grep' | wc -l) -ne 0 ]]; then
6460
echo "ERROR: Seems the servant was not killed!!!"

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ let targets: [PackageDescription.Target] = [
6565
// ==== ------------------------------------------------------------------------------------------------------------
6666
// MARK: Integration Tests - `it_` prefixed
6767
.target(
68-
name: "it_ProcessIsolated_faultingWorkers",
68+
name: "it_ProcessIsolated_escalatingWorkers",
6969
dependencies: [
7070
"DistributedActors",
7171
],

Sources/DistributedActors/ActorShell.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ internal final class ActorShell<Message>: ActorContext<Message>, AbstractActor {
138138

139139
self.supervisor = Supervision.supervisorFor(system, initialBehavior: behavior, props: props.supervision)
140140

141-
if let failureDetectorRef = system._cluster?._nodeDeathWatcher {
142-
self._deathWatch = DeathWatch(failureDetectorRef: failureDetectorRef)
141+
if let nodeDeathWatcher = system._nodeDeathWatcher {
142+
self._deathWatch = DeathWatch(nodeDeathWatcher: nodeDeathWatcher)
143143
} else {
144144
// FIXME; we could see if `myself` is the right one actually... rather than dead letters; if we know the FIRST actor ever is the failure detector one?
145-
self._deathWatch = DeathWatch(failureDetectorRef: system.deadLetters.adapted())
145+
self._deathWatch = DeathWatch(nodeDeathWatcher: system.deadLetters.adapted())
146146
}
147147

148148
self.namingContext = ActorNamingContext()

Sources/DistributedActors/ActorSystem.swift

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ public final class ActorSystem {
9191

9292
// initialized during startup
9393
internal var _cluster: ClusterShell?
94-
internal var _clusterEventStream: EventStream<ClusterEvent>?
94+
internal var _clusterEvents: EventStream<ClusterEvent>?
95+
internal var _nodeDeathWatcher: NodeDeathWatcherShell.Ref?
9596

97+
// ==== ----------------------------------------------------------------------------------------------------------------
9698
// MARK: Logging
9799

98100
public var log: Logger {
@@ -199,8 +201,16 @@ public final class ActorSystem {
199201
do {
200202
// Cluster MUST be the last thing we initialize, since once we're bound, we may receive incoming messages from other nodes
201203
if let cluster = self._cluster {
202-
self._clusterEventStream = try! EventStream(self, name: "clusterEvents")
204+
let clusterEvents = try! EventStream<ClusterEvent>(self, name: "clusterEvents")
205+
self._clusterEvents = clusterEvents // TODO: why stored on self here?
203206
_ = try cluster.start(system: self, eventStream: self.clusterEvents) // only spawns when cluster is initialized
207+
208+
// Node watcher MUST be started AFTER cluster and clusterEvents
209+
self._nodeDeathWatcher = try self._spawnSystemActor(
210+
NodeDeathWatcherShell.naming,
211+
NodeDeathWatcherShell.behavior(clusterEvents: clusterEvents),
212+
perpetual: true
213+
)
204214
}
205215
} catch {
206216
fatalError("Failed while starting cluster subsystem! Error: \(error)")

Sources/DistributedActors/Cluster/ActorSystem+Cluster.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,6 @@ extension ActorSystem {
6161
}
6262

6363
internal var clusterEvents: EventStream<ClusterEvent> {
64-
return self._clusterEventStream ?? EventStream(ref: self.deadLetters.adapted())
64+
return self._clusterEvents ?? EventStream(ref: self.deadLetters.adapted())
6565
}
6666
}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,6 @@ internal class ClusterShell {
9393
return it
9494
}
9595

96-
// ==== ------------------------------------------------------------------------------------------------------------
97-
// MARK: Node-Death Watcher
98-
99-
// Implementation notes: The `_failureDetectorRef` has to remain internally accessible.
100-
// This is in order to solve a chicken-and-egg problem that we face during spawning of
101-
// the first system actor that is the *failure detector* so it cannot reach to the systems
102-
// value before it started...
103-
var _nodeDeathWatcher: NodeDeathWatcherShell.Ref?
104-
var nodeDeathWatcher: NodeDeathWatcherShell.Ref {
105-
guard let it = self._nodeDeathWatcher else {
106-
return fatalErrorBacktrace("Accessing ClusterShell.nodeDeathWatcher failed, was nil! This should never happen as access should only happen after start() was invoked.")
107-
}
108-
return it
109-
}
110-
11196
init() {
11297
self._associationsLock = Lock()
11398
self._associationsRegistry = [:]
@@ -116,21 +101,13 @@ internal class ClusterShell {
116101
// the single thing in the class it will modify is the associations registry, which we do to avoid actor queues when
117102
// remote refs need to obtain those
118103
//
119-
// TODO: see if we can restructure this to avoid these nil/then-set dance
104+
// FIXME: see if we can restructure this to avoid these nil/then-set dance
120105
self._ref = nil
121-
self._nodeDeathWatcher = nil
122106
}
123107

124108
/// Actually starts the shell which kicks off binding to a port, and all further cluster work
125109
internal func start(system: ActorSystem, eventStream: EventStream<ClusterEvent>) throws -> ClusterShell.Ref {
126110
self._serializationPool = try SerializationPool(settings: .default, serialization: system.serialization)
127-
128-
self._nodeDeathWatcher = try system._spawnSystemActor(
129-
NodeDeathWatcherShell.naming,
130-
NodeDeathWatcherShell.behavior(),
131-
perpetual: true
132-
)
133-
134111
self._events = eventStream
135112

136113
// TODO: concurrency... lock the ref as others may read it?
@@ -259,6 +236,7 @@ extension ClusterShell {
259236
return self.onJoin(context, state: state, joining: node)
260237

261238
case .handshakeWith(let remoteAddress, let replyTo):
239+
state.logMembership()
262240
return self.beginHandshake(context, state, with: remoteAddress, replyTo: replyTo)
263241
case .retryHandshake(let initiated):
264242
return self.connectSendHandshakeOffer(context, state, initiated: initiated)
@@ -469,7 +447,7 @@ extension ClusterShell {
469447
case .initiated(var initiated):
470448
switch initiated.onHandshakeError(error) {
471449
case .scheduleRetryHandshake(let delay):
472-
state.log.info("Schedule handshake retry to: [\(initiated.remoteNode)] delay: [\(delay)]")
450+
state.log.debug("Schedule handshake retry to: [\(initiated.remoteNode)] delay: [\(delay)]")
473451
context.timers.startSingle(
474452
key: TimerKey("handshake-timer-\(remoteNode)"),
475453
message: .command(.retryHandshake(initiated)),
@@ -619,7 +597,8 @@ extension ClusterShell {
619597
var state = state
620598

621599
if let change = state.onMembershipChange(node, toStatus: .down) {
622-
self.nodeDeathWatcher.tell(.forceDown(change.node))
600+
// self.nodeDeathWatcher.tell(.forceDown(change.node))
601+
self._events.publish(.membership(.memberDown(Member(node: change.node, status: .down))))
623602

624603
if let logChangeLevel = state.settings.logMembershipChanges {
625604
context.log.log(level: logChangeLevel, "Cluster membership change: \(reflecting: change), membership: \(state.membership)")

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,31 @@ enum NodeDeathWatcherShell {
126126
/// it would be possible however to allow implementing the raw protocol by user actors if we ever see the need for it.
127127
internal enum Message {
128128
case remoteActorWatched(watcher: AddressableActorRef, remoteNode: UniqueNode)
129-
case membershipSnapshot(Membership)
130-
case membershipChange(MembershipChange)
131-
case forceDown(UniqueNode) // TODO: this should go away with cluster events landing
129+
case membershipSnapshot(Membership) // TODO: remove?
130+
case membershipChange(MembershipChange) // TODO: remove as well
132131
}
133132

134-
static func behavior() -> Behavior<Message> {
133+
static func behavior(clusterEvents: EventStream<ClusterEvent>) -> Behavior<Message> {
135134
return .setup { context in
136-
// WARNING: DO NOT TOUCH context.system.cluster; we are started potentially before the cluster (!)
137135
let instance = NodeDeathWatcherInstance(selfNode: context.system.settings.cluster.uniqueBindNode)
136+
137+
context.system.cluster.events.subscribe(context.subReceive(ClusterEvent.self) { event in
138+
context.log.info("EVENT::::: \(event)")
139+
switch event {
140+
case .membership(.memberDown(let member)):
141+
let change = MembershipChange(node: member.node, fromStatus: .none, toStatus: .down)
142+
instance.handleAddressDown(change)
143+
default:
144+
() // ignore for now...
145+
}
146+
})
147+
138148
return NodeDeathWatcherShell.behavior(instance)
139149
}
140150
}
141151

142152
static func behavior(_ instance: NodeDeathWatcherInstance) -> Behavior<Message> {
143-
return .receive { _, message in
153+
return .receiveMessage { message in
144154

145155
let lastMembership: Membership = .empty // TODO: To be mutated based on membership changes
146156

@@ -157,11 +167,6 @@ enum NodeDeathWatcherShell {
157167

158168
case .membershipChange(let change):
159169
_ = instance.onMembershipChanged(change) // TODO: return and interpret directives
160-
161-
case .forceDown(let node):
162-
// TODO: we'd get the change from subscribing to events and applying to local membership
163-
let change = MembershipChange(node: node, fromStatus: .none, toStatus: .down)
164-
instance.handleAddressDown(change)
165170
}
166171
return .same
167172
}

Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public struct SWIMSettings {
3636
/// These logs will contain SWIM.Instance metadata, as offered by `SWIM.Instance.metadata`.
3737
/// All logs will be prefixed using `[tracelog:SWIM]`, for easier grepping and inspecting only logs related to the SWIM instance.
3838
// TODO: how to make this nicely dynamically changeable during runtime
39-
#if SACT_TRACELOG_SWIM
39+
// #if SACT_TRACELOG_SWIM
4040
var traceLogLevel: Logger.Level? = .warning
41-
#else
42-
var traceLogLevel: Logger.Level?
43-
#endif
41+
// #else
42+
// var traceLogLevel: Logger.Level?
43+
// #endif
4444
}
4545

4646
extension SWIM {

Sources/DistributedActors/DeathWatch.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ import NIO
2727
// Implementation notes:
2828
// Care was taken to keep this implementation separate from the ActorCell however not require more storage space.
2929
@usableFromInline
30-
internal struct DeathWatch<Message> { // TODO: may want to change to a protocol
30+
internal struct DeathWatch<Message> {
3131
private var watching = Set<AddressableActorRef>()
3232
private var watchedBy = Set<AddressableActorRef>()
3333

34-
private var failureDetectorRef: NodeDeathWatcherShell.Ref
34+
private var nodeDeathWatcher: NodeDeathWatcherShell.Ref
3535

36-
init(failureDetectorRef: NodeDeathWatcherShell.Ref) {
37-
self.failureDetectorRef = failureDetectorRef
36+
init(nodeDeathWatcher: NodeDeathWatcherShell.Ref) {
37+
self.nodeDeathWatcher = nodeDeathWatcher
3838
}
3939

4040
// MARK: perform watch/unwatch
@@ -160,7 +160,7 @@ internal struct DeathWatch<Message> { // TODO: may want to change to a protocol
160160

161161
private func subscribeNodeTerminatedEvents(myself: ActorRef<Message>, node: UniqueNode?) {
162162
if let remoteNode = node {
163-
self.failureDetectorRef.tell(.remoteActorWatched(watcher: AddressableActorRef(myself), remoteNode: remoteNode))
163+
self.nodeDeathWatcher.tell(.remoteActorWatched(watcher: AddressableActorRef(myself), remoteNode: remoteNode))
164164
}
165165
}
166166
}

0 commit comments

Comments
 (0)