Skip to content

Commit 658ffe4

Browse files
committed
Implemented that master process restarts only as many times as configured
1 parent 9c05a6e commit 658ffe4

File tree

14 files changed

+299
-165
lines changed

14 files changed

+299
-165
lines changed

IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_escalatingWorkers/main.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pprint("Started process: \(getpid()) with roles: \(isolated.roles)")
3333
struct OnPurposeBoom: Error {}
3434

3535
isolated.run(on: .master) {
36-
isolated.spawnServantProcess(supervision: .restart(atMost: 1, within: nil), args: ["fatalError"])
37-
isolated.spawnServantProcess(supervision: .restart(atMost: 1, within: nil), args: ["escalateError"])
36+
isolated.spawnServantProcess(supervision: .replace(atMost: 1, within: nil), args: ["fatalError"])
37+
isolated.spawnServantProcess(supervision: .replace(atMost: 1, within: nil), args: ["escalateError"])
3838
}
3939

4040
try isolated.run(on: .servant) {
@@ -58,8 +58,8 @@ try isolated.run(on: .servant) {
5858
// since we .escalate and are a top-level actor, this will cause the process to die as well
5959
throw OnPurposeBoom()
6060
} else {
61-
context.log.error("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly.")
62-
fatalError("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly.")
61+
context.log.error("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
62+
fatalError("MISSING FAILURE MODE ARGUMENT!!! Test is constructed not properly, or arguments were not passed properly. \(CommandLine.arguments)")
6363
}
6464
}
6565
})

IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_noLeaking/main.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ try isolated.run(on: .master) {
4343

4444
/// spawn a servant
4545

46-
isolated.spawnServantProcess(supervision: .restart(atMost: 100, within: .seconds(1)), args: ["ALPHA"])
46+
isolated.spawnServantProcess(supervision: .replace(atMost: 100, within: .seconds(1)), args: ["ALPHA"])
4747
}
4848

4949
// finally, once prepared, you have to invoke the following:

IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_respawnsServants/main.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ try isolated.run(on: .master) {
4949
})
5050

5151
// should we allow anyone to issue this, or only on master? we could `runOnMaster { control` etc
52-
isolated.spawnServantProcess(supervision: .restart(atMost: 100, within: .seconds(1)), args: ["ALPHA"])
52+
isolated.spawnServantProcess(supervision: .replace(atMost: 100, within: .seconds(1)), args: ["ALPHA"])
5353
}
5454

5555
// Notice that master has no workers, just the pool...

IntegrationTests/tests_02_process_isolated/shared.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
##
1414
##===----------------------------------------------------------------------===##
1515

16+
function echoerr() {
17+
echo "$@" 1>&2;
18+
}
19+
1620
function _killall() {
1721
set +e
1822
local killall_app_name="$1"

IntegrationTests/tests_02_process_isolated/test_02_kill_servant_master_restarts_it.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ await_n_processes "$app_name" 2
5858

5959
if [[ $(ps aux | awk '{print $2}' | grep ${pid_servant} | grep -v 'grep' | wc -l) -ne 0 ]]; then
6060
echo "ERROR: Seems the servant was not killed!!!"
61-
exit -2
61+
_killall ${app_name}
62+
exit -1
6263
fi
6364

6465
await_n_processes "$app_name" 2

IntegrationTests/tests_02_process_isolated/test_03_not_leak_fds.sh renamed to IntegrationTests/tests_02_process_isolated/test_03_servant_spawning_not_leak_fds.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ for pid_servant in $pid_servants; do
5252
if [[ $(lsof -p $pid_servant | wc -l) -gt 100 ]]; then
5353
lsof -p $pid_servant
5454
printf "${RED}ERROR: Seems the servant [${pid_servant}] has too many FDs open, did the masters FD leak?${RST}\n"
55-
exit -2
55+
56+
_killall ${app_name}
57+
exit -1
5658
fi
5759
done
5860

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,32 +37,51 @@ _killall ${app_name}
3737

3838
swift build # synchronously ensure built
3939

40-
swift run ${app_name} &
40+
declare -r log_file="/tmp/${app_name}.log"
41+
rm -f ${log_file}
42+
swift run ${app_name} > ${log_file} &
43+
44+
declare -r supervision_replace_grep_txt='supervision: REPLACE'
45+
declare -r supervision_stop_grep_txt='supervision: STOP'
46+
47+
# we want to wait until 2 STOPs are found in the logs; then we can check if the other conditions are as we expect
48+
echo "Waiting for servants to REPLACE and STOP..."
49+
spin=1 # spin counter
50+
max_spins=20
51+
while [[ $(cat ${log_file} | grep "${supervision_stop_grep_txt}" | wc -l) -ne 2 ]]; do
52+
sleep 1
53+
spin=$((spin+1))
54+
if [[ ${spin} -eq ${max_spins} ]]; then
55+
echoerr "Never saw enough '${supervision_stop_grep_txt}' in logs."
56+
cat ${log_file}
57+
exit -1
58+
fi
59+
done
4160

42-
await_n_processes "$app_name" 3
43-
44-
pid_master=$(ps aux | grep ${app_name} | grep -v grep | grep -v servant | awk '{ print $2 }')
45-
pid_servant=$(ps aux | grep ${app_name} | grep -v grep | grep servant | head -n1 | awk '{ print $2 }')
46-
47-
echo "> PID Master: ${pid_master}"
48-
echo "> PID Servant: ${pid_servant}"
61+
echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
62+
cat ${log_file} | grep "${supervision_replace_grep_txt}"
63+
echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
4964

50-
echo '~~~~~~~~~~~~BEFORE KILL~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
51-
ps aux | grep ${app_name}
5265
echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
66+
cat ${log_file} | grep "${supervision_stop_grep_txt}"
67+
echo '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
68+
69+
if [[ $(cat ${log_file} | grep "${supervision_replace_grep_txt}" | wc -l) -ne 2 ]]; then
70+
echoerr "ERROR: We expected 2 servants to only restart once, yet more restarts were detected!"
71+
cat ${log_file}
5372

54-
sleep 3 # TODO rather, sleep until another proc replaces the servant automatically
73+
_killall ${app_name}
74+
exit -1
75+
fi
5576

56-
# the 1 servant should die, but be restarted so we'll be back at two processes
57-
await_n_processes "$app_name" 3
77+
if [[ $(cat ${log_file} | grep "${supervision_stop_grep_txt}" | wc -l) -ne 2 ]]; then
78+
echoerr "ERROR: Expected the servants to STOP after they are replaced once!"
79+
cat ${log_file}
5880

59-
if [[ $(ps aux | awk '{print $2}' | grep ${pid_servant} | grep -v 'grep' | wc -l) -ne 0 ]]; then
60-
echo "ERROR: Seems the servant was not killed!!!"
81+
_killall ${app_name}
6182
exit -2
6283
fi
6384

64-
await_n_processes "$app_name" 2
65-
6685
# === cleanup ----------------------------------------------------------------------------------------------------------
6786

6887
_killall ${app_name}

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -678,27 +678,3 @@ extension ClusterShell {
678678
}
679679
}
680680
}
681-
682-
// ==== ----------------------------------------------------------------------------------------------------------------
683-
// MARK: ActorSystem extensions
684-
685-
extension ActorSystem {
686-
internal var clusterShell: ActorRef<ClusterShell.Message> {
687-
return self._cluster?.ref ?? self.deadLetters.adapt(from: ClusterShell.Message.self)
688-
}
689-
690-
// TODO: not sure how to best expose, but for now this is better than having to make all internal messages public.
691-
public func join(node: Node) {
692-
self.clusterShell.tell(.command(.join(node)))
693-
}
694-
695-
// TODO: not sure how to best expose, but for now this is better than having to make all internal messages public.
696-
public func _dumpAssociations() {
697-
let ref: ActorRef<Set<UniqueNode>> = try! self.spawn(.anonymous, .receive { context, nodes in
698-
let stringlyNodes = nodes.map { String(reflecting: $0) }.joined(separator: "\n ")
699-
context.log.info("~~~~ ASSOCIATED NODES ~~~~~\n \(stringlyNodes)")
700-
return .stop
701-
})
702-
self.clusterShell.tell(.query(.associatedNodes(ref)))
703-
}
704-
}

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,12 @@ enum NodeDeathWatcherShell {
135135
let instance = NodeDeathWatcherInstance(selfNode: context.system.settings.cluster.uniqueBindNode)
136136

137137
context.system.cluster.events.subscribe(context.subReceive(ClusterEvent.self) { event in
138-
context.log.info("EVENT::::: \(event)")
139138
switch event {
140139
case .membership(.memberDown(let member)):
141140
let change = MembershipChange(node: member.node, fromStatus: .none, toStatus: .down)
142141
instance.handleAddressDown(change)
143142
default:
144-
() // ignore for now...
143+
() // ignore other changes, we only need to react on nodes becoming DOWN
145144
}
146145
})
147146

Sources/DistributedActors/Cluster/SWIM/SWIMSettings.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public struct SWIMSettings {
3636
/// These logs will contain SWIM.Instance metadata, as offered by `SWIM.Instance.metadata`.
3737
/// All logs will be prefixed using `[tracelog:SWIM]`, for easier grepping and inspecting only logs related to the SWIM instance.
3838
// TODO: how to make this nicely dynamically changeable during runtime
39-
// #if SACT_TRACELOG_SWIM
39+
#if SACT_TRACELOG_SWIM
4040
var traceLogLevel: Logger.Level? = .warning
41-
// #else
42-
// var traceLogLevel: Logger.Level?
43-
// #endif
41+
#else
42+
var traceLogLevel: Logger.Level?
43+
#endif
4444
}
4545

4646
extension SWIM {

0 commit comments

Comments
 (0)