Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SwiftDistributedActors.xcworkspace
.xcode
.idea
IntegrationTests/tests_04_cluster/.build

# rendered docs output dirs
/reference/
Expand Down
4 changes: 3 additions & 1 deletion IntegrationTests/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ while getopts "f:vid" opt; do
done

function run_test() {
if $verbose; then
if $no_io_redirect; then
"$@"
elif $verbose; then
"$@" 2>&1 | tee -a "$out"
# we need to return the return value of the first command
return ${PIPESTATUS[0]}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActors

print("Getting args")

var args = CommandLine.arguments
args.removeFirst()

print("got args: \(args)")

guard args.count >= 1 else {
fatalError("no port given")
}

let system = ActorSystem("System") { settings in
settings.defaultLogLevel = .info

settings.cluster.enabled = true
settings.cluster.bindPort = Int(args[0])!

settings.cluster.swim.failureDetector.suspicionTimeoutPeriodsMax = 3
settings.cluster.swim.failureDetector.pingTimeout = .milliseconds(100)
settings.cluster.swim.failureDetector.probeInterval = .milliseconds(300)

settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
settings.cluster.downingStrategy = .none
}

let ref = try system.spawn("streamWatcher", of: Cluster.Event.self, .receive { context, event in
context.log.info("Event: \(event)")
return .same
})
system.cluster.events.subscribe(ref)

if args.count >= 3 {
print("getting host")
let host = args[1]
print("parsing port")
let port = Int(args[2])!
print("Joining")
system.cluster.join(node: Node(systemName: "System", host: host, port: port))
}

Thread.sleep(.seconds(120))
55 changes: 55 additions & 0 deletions IntegrationTests/tests_04_cluster/shared.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the Swift Distributed Actors open source project
##
## Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

RED='\033[0;31m'
RST='\033[0m'

function echoerr() {
echo "${RED}$@${RST}" 1>&2;
}

function _killall() {
set +e
local killall_app_name="$1"
echo "> KILLALL: $killall_app_name"
ps aux | grep ${killall_app_name} | awk '{ print $2 }' | xargs kill -9
set -e
}

function wait_log_exists() {
_log_file="$1"
_expected_line="$2"
if [[ "$#" -eq 3 ]]; then
_max_spins="$3"
max_spins=$(expr ${_max_spins} + 0)
else
max_spins=20
fi
spin=1 # spin counter
while [[ $(cat ${_log_file} | grep "${_expected_line}" | wc -l) -ne 1 ]]; do
echo "---------------------------------------------------------------------------------------------------------"
cat ${_log_file}
echo "========================================================================================================="

sleep 1
spin=$((spin+1))
if [[ ${spin} -eq ${max_spins} ]]; then
echoerr "Never saw enough '${_expected_line}' in logs."
cat ${_log_file}
exit -1
fi
done

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the Swift Distributed Actors open source project
##
## Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

set -e
#set -x # verbose

declare -r my_path="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
declare -r root_path="$my_path/.."

declare -r app_name='it_Clustered_swim_suspension_reachability'

source ${my_path}/shared.sh

declare -r first_logs=/tmp/sact_first.out
declare -r second_logs=/tmp/sact_second.out
rm -f ${first_logs}
rm -f ${second_logs}

stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 7337 > ${first_logs} 2>&1 &
declare -r first_pid=$(echo $!)
wait_log_exists ${first_logs} 'Binding to: ' 200 # since it might be compiling again...

stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 8228 localhost 7337 > ${second_logs} 2>&1 &
declare -r second_pid=$(echo $!)
wait_log_exists ${second_logs} 'Binding to: ' 200 # since it might be compiling again...

echo "Waiting nodes to become .up..."
wait_log_exists ${first_logs} 'membershipChange(sact://System@localhost:8228 :: \[joining\] -> \[ up\])' 40
echo 'Second member seen .up, good...'

# suspend the second process, causing unreachability
kill -SIGSTOP ${second_pid}
jobs

wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: unreachable' 40
echo 'Second member seen .unreachable, good...'

# resume it in the background
kill -SIGCONT ${second_pid}

# it should become reachable again
declare -r expected_second_member_unreachable=
wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: reachable' 40
echo 'Second member seen .unreachable, good...'


# === cleanup ----------------------------------------------------------------------------------------------------------

kill -9 ${first_pid}
kill -9 ${second_pid}

_killall ${app_name}
25 changes: 21 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ import class Foundation.ProcessInfo
// and ONE of our dependencies currently produces one warning, we have to use this workaround to enable it in _our_
// targets when the flag is set. We should remove the dependencies and then enable the flag globally though just by passing it.
// TODO: Follow up to https://github.com/apple/swift-distributed-actors/issues/23 by removing Files and Stencil, then we can remove this workaround
let globalSwiftSettings: [SwiftSetting] = ProcessInfo.processInfo.environment["SACT_WARNINGS_AS_ERRORS"] == nil ? [
SwiftSetting.unsafeFlags(["-warnings-as-errors"])
] : []
let globalSwiftSettings: [SwiftSetting]
if ProcessInfo.processInfo.environment["SACT_WARNINGS_AS_ERRORS"] != nil {
print("SACT_WARNINGS_AS_ERRORS enabled, passing `-warnings-as-errors`")
globalSwiftSettings = [
SwiftSetting.unsafeFlags(["-warnings-as-errors"])
]
} else {
globalSwiftSettings = []
}

var targets: [PackageDescription.Target] = [
// ==== ------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -162,6 +168,13 @@ var targets: [PackageDescription.Target] = [
],
path: "IntegrationTests/tests_02_process_isolated/it_ProcessIsolated_backoffRespawn"
),
.target(
name: "it_Clustered_swim_suspension_reachability",
dependencies: [
"DistributedActors",
],
path: "IntegrationTests/tests_04_cluster/it_Clustered_swim_suspension_reachability"
),

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Performance / Benchmarks
Expand Down Expand Up @@ -307,7 +320,11 @@ var package = Package(
dependencies: dependencies,

targets: targets.map { target in
target.swiftSettings?.append(contentsOf: globalSwiftSettings)
var swiftSettings = target.swiftSettings ?? []
swiftSettings.append(contentsOf: globalSwiftSettings)
if !swiftSettings.isEmpty {
target.swiftSettings = swiftSettings
}
return target
},

Expand Down
8 changes: 7 additions & 1 deletion Samples/Sources/SampleCluster/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ guard args.count >= 1 else {
let system = ActorSystem("System") { settings in
settings.cluster.enabled = true
settings.cluster.bindPort = Int(args[0])!
settings.cluster.downingStrategy = .timeout(.default)
settings.cluster.downingStrategy = .none
settings.defaultLogLevel = .debug
}

let ref = try system.spawn("hello", of: Cluster.Event.self, .receive { context, event in
context.log.info("event = \(event)")
return .same
})
system.cluster.events.subscribe(ref)

if args.count >= 3 {
print("getting host")
let host = args[1]
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/Cluster/ClusterReceptionist.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ internal enum ClusterReceptionist {
}

private static func onFullStateRequest(context: ActorContext<Receptionist.Message>, request: ClusterReceptionist.FullStateRequest, storage: Receptionist.Storage) {
context.log.debug("Received full state request from [\(request.replyTo)]") // TODO: tracelog style
context.log.trace("Received full state request from [\(request.replyTo)]") // TODO: tracelog style
var registrations: [AnyRegistrationKey: [ActorAddress]] = [:]
registrations.reserveCapacity(storage._registrations.count)
for (key, values) in storage._registrations {
Expand Down Expand Up @@ -141,7 +141,7 @@ internal enum ClusterReceptionist {
}

private static func onFullState(context: ActorContext<Receptionist.Message>, fullState: ClusterReceptionist.FullState, storage: Receptionist.Storage) throws {
context.log.debug("Received full state \(fullState)") // TODO: tracelog style
context.log.trace("Received full state \(fullState)") // TODO: tracelog style
for (key, paths) in fullState.registrations {
var anyAdded = false
for path in paths {
Expand Down
25 changes: 14 additions & 11 deletions Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ internal class ClusterShell {
private var _associationTombstones: Set<Association.TombstoneState>

private var _swimRef: SWIM.Ref?
private var swimRef: SWIM.Ref {
guard let ref = _swimRef else {
return fatalErrorBacktrace("Illegal early access to ClusterShell._swimRef detected! This ref is initialized during bind(), and must not be accessed earlier than that.")
}
return ref
}

private var clusterEvents: EventStream<Cluster.Event>!

Expand Down Expand Up @@ -130,7 +124,7 @@ internal class ClusterShell {
// notify the failure detector, that we shall assume this node as dead from here on.
// it's gossip will also propagate the information through the cluster
traceLog_Remote(system.cluster.node, "Finish terminate association [\(remoteNode)]: Notifying SWIM, .confirmDead")
self.swimRef.tell(.local(.confirmDead(remoteNode)))
self._swimRef?.tell(.local(.confirmDead(remoteNode)))

// Ensure to remove (completely) the member from the Membership, it is not even .leaving anymore.
if state.membership.mark(remoteNode, as: .down) == nil {
Expand Down Expand Up @@ -336,8 +330,17 @@ extension ClusterShell {
let uniqueBindAddress = clusterSettings.uniqueBindNode

// SWIM failure detector and gossiping
let swimBehavior = SWIMShell(settings: clusterSettings.swim, clusterRef: context.myself).behavior
self._swimRef = try context._downcastUnsafe._spawn(SWIMShell.naming, props: ._wellKnown, swimBehavior)
if !clusterSettings.swim.disabled {
let swimBehavior = SWIMShell(settings: clusterSettings.swim, clusterRef: context.myself).behavior
self._swimRef = try context._downcastUnsafe._spawn(SWIMShell.naming, props: ._wellKnown, swimBehavior)
} else {
context.log.warning("""
SWIM Failure Detector has been [disabled]! \
Reachability events will NOT be emitted, meaning that most downing strategies will not be able to perform \
their duties. Please ensure that an external mechanism for detecting failed cluster nodes is used.
""")
self._swimRef = nil
}

// automatic leader election, so it may move members: .joining -> .up (and other `LeaderAction`s)
if let leaderElection = context.system.settings.cluster.autoLeaderElection.make(context.system.cluster.settings) {
Expand Down Expand Up @@ -930,7 +933,7 @@ extension ClusterShell {
switch res {
case .success(.success(let uniqueNode)):
context.log.debug("Associated \(uniqueNode), informing SWIM to monitor this node.")
self.swimRef.tell(.local(.monitor(uniqueNode)))
self._swimRef?.tell(.local(.monitor(uniqueNode)))
return .same // .same, since state was modified since inside the handshakeWith (!)
case .success(.failure(let error)):
context.log.debug("Handshake with \(reflecting: node) failed: \(error)")
Expand Down Expand Up @@ -981,7 +984,7 @@ extension ClusterShell {
"cluster/membership": "\(state.membership)", // TODO: introduce state.metadata pattern?
])

self.swimRef.tell(.local(.confirmDead(memberToDown.node)))
self._swimRef?.tell(.local(.confirmDead(memberToDown.node)))

do {
let onDownAction = context.system.settings.cluster.onDownAction.make()
Expand Down
11 changes: 8 additions & 3 deletions Sources/DistributedActors/Cluster/SWIM/SWIM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum SWIM {
internal enum Message {
case remote(RemoteMessage)
case local(LocalMessage)
case testing(TestingMessage) // TODO: hopefully no need for this soon once cluster events land
case _testing(TestingMessage)
}

internal enum RemoteMessage {
Expand All @@ -48,7 +48,7 @@ public enum SWIM {
/// "Ping Request" requests a SWIM probe.
case pingReq(target: ActorRef<Message>, lastKnownStatus: Status, replyTo: ActorRef<Ack>, payload: Payload)

/// Extension: Lifeguard, Local Health Aware Probe
// TODO: Implement Extension: Lifeguard, Local Health Aware Probe
/// LHAProbe adds a `nack` message to the fault detector protocol,
/// which is sent in the case of failed indirect probes. This gives the member that
/// initiates the indirect probe a way to check if it is receiving timely responses
Expand All @@ -66,6 +66,11 @@ public enum SWIM {
let pinged: ActorRef<Message>
let incarnation: Incarnation
let payload: Payload

/// Represents the pinged member in alive status, since it clearly has replied to our ping, so it must be alive.
func pingedAliveMember(protocolPeriod: Int) -> SWIM.Member {
.init(ref: self.pinged, status: .alive(incarnation: self.incarnation), protocolPeriod: protocolPeriod)
}
}

internal struct MembershipState {
Expand Down Expand Up @@ -231,7 +236,7 @@ extension SWIM.Status {
switch self {
case .dead:
return true
case .alive, .unreachable, .suspect:
case .alive, .suspect, .unreachable:
return false
}
}
Expand Down
Loading