Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
d82c40e
=remote #482 #382 #383 Race free association / remote sends
ktoso Apr 27, 2020
0195ee8
=swim log also when suspision comes in from incoming gossip
ktoso Apr 24, 2020
0c66985
=handshake no reason to crash, simply reject incoming unexpected hand…
ktoso Apr 24, 2020
24c22a0
=cluster cleanups, swim logging and timeouts in tests
ktoso Apr 25, 2020
74e8190
=int-tests we're using 127.0.0.1 by default now
ktoso Apr 27, 2020
6c1a39f
=test unlock test_singletonByClusterLeadership_withLeaderChange
ktoso Apr 27, 2020
ab2a941
=remote protect association from being completed multiple times
ktoso Apr 28, 2020
d268d15
fix the killing of the "good" connection as well
ktoso Apr 29, 2020
674b1f2
=test,fix default address uses 127.0.0.1 fix test expectation
ktoso Apr 29, 2020
8e1454e
=test better naming in handshake messages
ktoso Apr 29, 2020
f3e5f72
=test adjust that we use 127.0.0.1 in tests now
ktoso Apr 30, 2020
639cc2a
=test,downing increase timeouts as current default detection gives qu…
ktoso Apr 30, 2020
805fec7
=test increase default expectation timeout; not much pentalty for qui…
ktoso Apr 30, 2020
78b3d4c
wip fixing associations more and more
ktoso May 1, 2020
a26a8a6
INTENSE simplification, drop the whenCompleted future -- it's causing
ktoso May 1, 2020
9e74a11
=tests need to use 127.0.0.1 rather than localhost for consistency
ktoso May 1, 2020
423f60b
=test cleanups
ktoso May 1, 2020
32a5516
include time information in pprints so we can easier debug timing iss…
ktoso May 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ try! _file.append("service starting...\n")
let system = ActorSystem("it_XPCActorable_echo_service") { settings in
settings.transports += .xpcService

settings.cluster.swim.failureDetector.pingTimeout = .seconds(3)
Copy link
Member Author

@ktoso ktoso Apr 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this really is an acknowlagement to how we now use SWIM; it IS our failure detector.

It used to be both the only membership we had and the failure detector.
Now we're more true to how we use it in the settings.

settings.cluster.swim.pingTimeout = .seconds(3)

// settings.serialization.register(GeneratedActor.Messages.XPCEchoServiceProtocol.self, underId: 10001)
// settings.serialization.register(XPCEchoService.Message.self, underId: 10002)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ let system = ActorSystem("System") { settings in
settings.cluster.enabled = true
settings.cluster.bindPort = Int(args[0])!

settings.cluster.swim.probeInterval = .milliseconds(300)
settings.cluster.swim.pingTimeout = .milliseconds(100)
settings.cluster.swim.lifeguard.suspicionTimeoutMin = .seconds(1)
settings.cluster.swim.lifeguard.suspicionTimeoutMax = .seconds(1)
settings.cluster.swim.failureDetector.pingTimeout = .milliseconds(100)
settings.cluster.swim.failureDetector.probeInterval = .milliseconds(300)

settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2)
settings.cluster.downingStrategy = .none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,27 @@ stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 7337 > ${
declare -r first_pid=$(echo $!)
wait_log_exists ${first_logs} 'Binding to: ' 200 # since it might be compiling again...

stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 8228 localhost 7337 > ${second_logs} 2>&1 &
stdbuf -i0 -o0 -e0 swift run it_Clustered_swim_suspension_reachability 8228 127.0.0.1 7337 > ${second_logs} 2>&1 &
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed defaults everywhere, was getting confused from a mix; now it's 127.0.0.1 everywhere by default.

declare -r second_pid=$(echo $!)
wait_log_exists ${second_logs} 'Binding to: ' 200 # since it might be compiling again...

echo "Waiting nodes to become .up..."
wait_log_exists ${first_logs} 'membershipChange(sact://System@localhost:8228 :: \[joining\] -> \[ up\])' 40
wait_log_exists ${first_logs} 'membershipChange(sact://System@127.0.0.1:8228 :: \[joining\] -> \[ up\])' 50
echo 'Second member seen .up, good...'

# suspend the second process, causing unreachability
kill -SIGSTOP ${second_pid}
jobs

wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: unreachable' 40
wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*127.0.0.1:8228, status: up, reachability: unreachable' 50
echo 'Second member seen .unreachable, good...'

# resume it in the background
kill -SIGCONT ${second_pid}

# it should become reachable again
declare -r expected_second_member_unreachable=
wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*localhost:8228, status: up, reachability: reachable' 40
wait_log_exists ${first_logs} 'reachabilityChange(DistributedActors.Cluster.ReachabilityChange.*127.0.0.1:8228, status: up, reachability: reachable' 50
echo 'Second member seen .unreachable, good...'


Expand Down
16 changes: 7 additions & 9 deletions Protos/WireProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import "Serialization/Serialization.proto";

message HandshakeOffer {
ProtocolVersion version = 1;
UniqueNode from = 2;
Node to = 3;
UniqueNode originNode = 2;
Node targetNode = 3;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one of the first things / types I ever did for this project 😉
The naming from/to seemed good at the time, but since then we have a more common local/remote target/origin wording, so keeping those words all the way through

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious--How come targetNode is Node instead of UniqueNode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's on purpose -- in say a user writes join(Node(127.0.0.1 8888)) we don't know that nodes NodeID and thus it cannot be an UniqueNode, only once the node replies we know it's unique id.

// If we discovered the node using gossip or an actor ref indicating an actor on the remote node we would indeed know the NID though; but the handshake is extended to "some node on that address".

If we end up associating and that NID is different than the uniqueNode used by some actor ref, it means that other unique node is likely "wrong" in the sense that it likely was a previous actor system instance on the same host:port, but it's a new instance/process. Such messages would end up being deadlettered.

Copy link
Member Author

@ktoso ktoso May 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a specific test for this: Test: Hold "old ref" while new associated node on same host:port is associated #603

// In the future we may want to add additional information
// about certain capabilities here. E.g. when a node supports
// faster transport like InfiniBand and the likes, so we can
Expand All @@ -43,16 +43,14 @@ message HandshakeResponse {

message HandshakeAccept {
ProtocolVersion version = 1;
UniqueNode origin = 2;
UniqueNode from = 3;
UniqueNode originNode = 2;
UniqueNode targetNode = 3;
}

message HandshakeReject {
ProtocolVersion version = 1;
UniqueNode origin = 2;
// In the reject case this is an `Node` instead of a `UniqueNode`,
// to explicitly prevent this from forming an association.
Node from = 3;
ProtocolVersion version = 1;
UniqueNode originNode = 2;
UniqueNode targetNode = 3;
string reason = 4;
}

Expand Down
2 changes: 1 addition & 1 deletion Samples/Sources/SampleCluster/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,4 @@ if system.cluster.node.port == 7337 { // <2>

// end::cluster-sample-actors-discover-and-chat[]

system.park(atMost: .seconds(60))
system.park(atMost: .seconds(6000))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically keep them around "forever" when testing looking at logs ;)

19 changes: 0 additions & 19 deletions Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -635,16 +635,6 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
return try self._spawn(naming, props: props, behavior)
}

// public override func spawn<M>(
// _ naming: ActorNaming, of type: M.Type = M.self, props: Props = Props(),
// file: String = #file, line: UInt = #line,
// _ behavior: Behavior<M>
// ) throws -> ActorRef<M>
// where M: ActorMessage {
// try self.system.serialization._ensureCodableSerializer(type, file: file, line: line)
// return try self._spawn(naming, props: props, behavior)
// }

public override func spawnWatch<Message>(
_ naming: ActorNaming, of type: Message.Type = Message.self, props: Props,
file: String = #file, line: UInt = #line,
Expand All @@ -654,15 +644,6 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
self.watch(try self.spawn(naming, props: props, behavior))
}

// public override func spawnWatch<Message>(
// _ naming: ActorNaming, of type: Message.Type = Message.self, props: Props,
// file: String = #file, line: UInt = #line,
// _ behavior: Behavior<Message>
// ) throws -> ActorRef<Message>
// where Message: ActorMessage {
// self.watch(try self.spawn(naming, props: props, behavior))
// }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup

public override func stop<Message: ActorMessage>(child ref: ActorRef<Message>) throws {
try self._stop(child: ref)
}
Expand Down
3 changes: 1 addition & 2 deletions Sources/DistributedActors/ActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ public final class ActorSystem {
var effectiveSystemProvider: _ActorRefProvider = localSystemProvider

if settings.cluster.enabled {
// FIXME: make SerializationPoolSettings configurable
let cluster = ClusterShell()
let cluster = ClusterShell(selfNode: settings.cluster.uniqueBindNode)
initializationLock.withWriterLockVoid {
self._cluster = cluster
}
Expand Down
7 changes: 6 additions & 1 deletion Sources/DistributedActors/Behaviors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,12 @@ public extension Behavior {
case .suspend:
fatalError("Illegal to attempt to interpret message with .suspend behavior! Behavior should have been canonicalized. This is a bug, please open a ticket.", file: file, line: line)
case .suspended:
fatalError("No message should ever be delivered to a .suspended behavior! This is a bug, please open a ticket.", file: file, line: line)
fatalError("""
No message should ever be delivered to a .suspended behavior!
Message: \(message)
Actor: \(context)
This is a bug, please open a ticket.
""", file: file, line: line)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More details in case this happens again, the previous crash would not be easy to follow up on

}
}

Expand Down
14 changes: 7 additions & 7 deletions Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ extension CRDT.Replicator {
switch event {
case .membershipChange(let change) where change.toStatus == .up:
let member = change.member
if member.node != context.system.cluster.node { // exclude this (local) node
self.tracelog(context, .addMember, message: member)
let remoteReplicatorRef = makeReplicatorRef(member.node)
self.remoteReplicators.insert(remoteReplicatorRef)
} else {
context.log.trace("Skip adding member \(member) to replicator because it is the same as local node", metadata: self.metadata(context))
guard member.node != context.system.cluster.node else {
return // Skip adding member to replicator because it is the same as local node
}

self.tracelog(context, .addMember, message: member)
let remoteReplicatorRef = makeReplicatorRef(member.node)
self.remoteReplicators.insert(remoteReplicatorRef)

case .membershipChange(let change) where change.toStatus >= .down:
let member = change.member
self.tracelog(context, .removeMember, message: member)
Expand All @@ -106,7 +106,7 @@ extension CRDT.Replicator {
case .membershipChange:
context.log.trace("Ignoring cluster event \(event), only interested in >= .up events", metadata: self.metadata(context))
default:
() // ignore other events
return // ignore other events
}
}

Expand Down
Loading