diff --git a/Docs/internals.adoc b/Docs/internals.adoc index 2442a0ddf..238c82680 100644 --- a/Docs/internals.adoc +++ b/Docs/internals.adoc @@ -151,26 +151,12 @@ R -[#red]-x L: HandshakeReject +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ``` +==== Handshake phases -=== In memory core concepts +Note that handshakes may "race" from both nodes to each other concurrently, thus the system has to "pick one". -==== Association -Established between two nodes. -Only "associated nodes" may talk to one another. - -Rules: - - - don't talk to strangers - * nodes MUST handshake before exchanging any messages - -=== *RemoteControl - -Control objects allow performing actual actions onto the network layer. - -These are exposed to actual refs and used to e.g. send messages. -There can be many remote controls, but always one association for a pair of nodes. ==== Watch and clustering diff --git a/Sources/DistributedActors/ActorLogging.swift b/Sources/DistributedActors/ActorLogging.swift index f2d5c30e9..9539ba6ad 100644 --- a/Sources/DistributedActors/ActorLogging.swift +++ b/Sources/DistributedActors/ActorLogging.swift @@ -72,10 +72,14 @@ internal final class LoggingContext { /// such as it's path or node on which it resides. /// /// The preferred way of obtaining a logger for an actor or system is `context.log` or `system.log`, rather than creating new ones. -public struct ActorLogger { +extension Logger { public static func make(context: ActorContext) -> Logger { - var log = context.system.log - log[metadataKey: "actor/path"] = Logger.MetadataValue.stringConvertible(context.path) + Logger.make(context.log, path: context.path) + } + + internal static func make(_ base: Logger, path: ActorPath) -> Logger { + var log = base + log[metadataKey: "actor/path"] = Logger.MetadataValue.stringConvertible(path) return log } } @@ -274,22 +278,32 @@ public struct LogMessage { let line: UInt } -// MARK: Extend logging metadata storage capabilities - -extension Logger.Metadata { +extension Logger.MetadataValue { public static func pretty(_ value: T) -> Logger.Metadata.Value where T: CustomPrettyStringConvertible { - .string(value.prettyDescription) + Logger.MetadataValue.stringConvertible(CustomPrettyStringConvertibleMetadataValue(value)) } public static func pretty(_ value: T) -> Logger.Metadata.Value { if let pretty = value as? CustomPrettyStringConvertible { - return .string(pretty.prettyDescription) + return Logger.MetadataValue.stringConvertible(CustomPrettyStringConvertibleMetadataValue(pretty)) } else { return .string("\(value)") } } } +struct CustomPrettyStringConvertibleMetadataValue: CustomStringConvertible { + let value: CustomPrettyStringConvertible + + init(_ value: CustomPrettyStringConvertible) { + self.value = value + } + + var description: String { + "\(self.value)" + } +} + extension Optional where Wrapped == Logger.MetadataValue { public static func lazyStringConvertible(_ makeValue: @escaping () -> CustomStringConvertible) -> Logger.Metadata.Value { .stringConvertible(LazyMetadataBox { makeValue() }) diff --git a/Sources/DistributedActors/ActorShell.swift b/Sources/DistributedActors/ActorShell.swift index a68c48905..0bd654d04 100644 --- a/Sources/DistributedActors/ActorShell.swift +++ b/Sources/DistributedActors/ActorShell.swift @@ -147,6 +147,7 @@ public final class ActorShell: ActorContext, Abs self.behavior = behavior self._address = address self._props = props + self._log = .make(system.log, path: address.path) self.supervisor = Supervision.supervisorFor(system, initialBehavior: behavior, props: props.supervision) self._deathWatch = DeathWatch(nodeDeathWatcher: system._nodeDeathWatcher ?? system.deadLetters.adapted()) @@ -239,7 +240,7 @@ public final class ActorShell: ActorContext, Abs } // access only from within actor - private lazy var _log = ActorLogger.make(context: self) + private var _log: Logger public override var log: Logger { get { self._log diff --git a/Sources/DistributedActors/Backoff.swift b/Sources/DistributedActors/Backoff.swift index 5bf1d20b1..3ce3d6e5f 100644 --- a/Sources/DistributedActors/Backoff.swift +++ b/Sources/DistributedActors/Backoff.swift @@ -65,13 +65,22 @@ public enum Backoff { /// MUST be `>= initialInterval`. /// - randomFactor: A random factor of `0.5` results in backoffs between 50% below and 50% above the base interval. /// MUST be between: `<0; 1>` (inclusive) + /// - maxAttempts: An optional maximum number of times backoffs shall be attempted. + /// MUST be `> 0` if set (or `nil`). public static func exponential( initialInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.initialInterval, multiplier: Double = ExponentialBackoffStrategy.Defaults.multiplier, capInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.capInterval, - randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor + randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor, + maxAttempts: Int? = ExponentialBackoffStrategy.Defaults.maxAttempts ) -> ExponentialBackoffStrategy { - .init(initialInterval: initialInterval, multiplier: multiplier, capInterval: capInterval, randomFactor: randomFactor) + .init( + initialInterval: initialInterval, + multiplier: multiplier, + capInterval: capInterval, + randomFactor: randomFactor, + maxAttempts: maxAttempts + ) } } @@ -148,6 +157,8 @@ public struct ExponentialBackoffStrategy: BackoffStrategy { // TODO: We could also implement taking a Clock, and using it see if there's a total limit exceeded // public static let maxElapsedTime: TimeAmount = .minutes(30) + + public static let maxAttempts: Int? = nil } let initialInterval: TimeAmount @@ -155,23 +166,34 @@ public struct ExponentialBackoffStrategy: BackoffStrategy { let capInterval: TimeAmount let randomFactor: Double + var limitedRemainingAttempts: Int? + // interval that will be used in the `next()` call, does NOT include the random noise component private var currentBaseInterval: TimeAmount - internal init(initialInterval: TimeAmount, multiplier: Double, capInterval: TimeAmount, randomFactor: Double) { + internal init(initialInterval: TimeAmount, multiplier: Double, capInterval: TimeAmount, randomFactor: Double, maxAttempts: Int?) { precondition(initialInterval.nanoseconds > 0, "initialInterval MUST be > 0ns, was: [\(initialInterval.prettyDescription)]") precondition(multiplier >= 1.0, "multiplier MUST be >= 1.0, was: [\(multiplier)]") precondition(initialInterval <= capInterval, "capInterval MUST be >= initialInterval, was: [\(capInterval)]") precondition(randomFactor >= 0.0 && randomFactor <= 1.0, "randomFactor MUST be within between 0 and 1, was: [\(randomFactor)]") + if let n = maxAttempts { + precondition(n > 0, "maxAttempts MUST be nil or > 0, was: [\(n)]") + } self.initialInterval = initialInterval self.currentBaseInterval = initialInterval self.multiplier = multiplier self.capInterval = capInterval self.randomFactor = randomFactor + self.limitedRemainingAttempts = maxAttempts } public mutating func next() -> TimeAmount? { + defer { self.limitedRemainingAttempts? -= 1 } + if let remainingAttempts = self.limitedRemainingAttempts, remainingAttempts <= 0 { + return nil + } // else, still attempts remaining, or no limit set + let baseInterval = self.currentBaseInterval let randomizeMultiplier = Double.random(in: (1 - self.randomFactor) ... (1 + self.randomFactor)) diff --git a/Sources/DistributedActors/CRDT/ActorOwned+CRDT.swift b/Sources/DistributedActors/CRDT/ActorOwned+CRDT.swift index 01f46ee97..9d0d4b765 100644 --- a/Sources/DistributedActors/CRDT/ActorOwned+CRDT.swift +++ b/Sources/DistributedActors/CRDT/ActorOwned+CRDT.swift @@ -313,6 +313,12 @@ extension CRDT { } } +extension CRDT.ActorOwned: CustomStringConvertible, CustomPrettyStringConvertible { + public var description: String { + "CRDT.ActorOwned(id: \(self.id), data: \(self.data), status: \(self.status))" + } +} + extension CRDT.ActorOwned { /// Register callback for owning actor to be notified when the CRDT instance has been updated. /// diff --git a/Sources/DistributedActors/CRDT/CRDT+Gossip.swift b/Sources/DistributedActors/CRDT/CRDT+Gossip.swift index 4c2192632..27658f1b0 100644 --- a/Sources/DistributedActors/CRDT/CRDT+Gossip.swift +++ b/Sources/DistributedActors/CRDT/CRDT+Gossip.swift @@ -177,9 +177,8 @@ extension CRDT.Identity: GossipIdentifier { extension CRDT { /// The gossip to be spread about a specific CRDT (identity). - struct Gossip: GossipEnvelopeProtocol, CustomPrettyStringConvertible { + struct Gossip: GossipEnvelopeProtocol, CustomStringConvertible, CustomPrettyStringConvertible { struct Metadata: Codable {} - typealias Payload = StateBasedCRDT var metadata: Metadata @@ -197,6 +196,10 @@ extension CRDT { mutating func tryMerge(other: StateBasedCRDT) -> CRDT.MergeError? { self.payload._tryMerge(other: other) } + + var description: String { + "CRDT.Gossip(metadata: \(metadata), payload: \(payload))" + } } } diff --git a/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift b/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift index b7f083b3d..90c7ea2aa 100644 --- a/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift +++ b/Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift @@ -71,10 +71,10 @@ extension CRDT.Replicator { } ) - self.gossipReplication = try GossipShell.start( + self.gossipReplication = try Gossiper.start( context, name: "gossip", - settings: GossipShell.Settings( + settings: Gossiper.Settings( gossipInterval: self.settings.gossipInterval, gossipIntervalRandomFactor: self.settings.gossipIntervalRandomFactor, peerDiscovery: .fromReceptionistListing(id: "crdt-gossip-replicator") @@ -130,10 +130,8 @@ extension CRDT.Replicator { self.receiveClusterEvent(context, event: .membershipChange(change)) } - case .membershipChange: - context.log.trace("Ignoring cluster event \(event), only interested in >= .up events", metadata: self.metadata(context)) default: - return // ignore other events + return // ignore other events (including membership changes for events lesser than .up) } } diff --git a/Sources/DistributedActors/Cluster/Association.swift b/Sources/DistributedActors/Cluster/Association.swift index f7b4ac7b5..89c5c5222 100644 --- a/Sources/DistributedActors/Cluster/Association.swift +++ b/Sources/DistributedActors/Cluster/Association.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import DistributedActorsConcurrencyHelpers +import struct Foundation.Date import Logging import NIO @@ -65,7 +66,7 @@ final class Association: CustomStringConvertible { /// Complete the association and drain any pending message sends onto the channel. // TODO: This style can only ever work since we lock around the entirety of enqueueing messages and this setting; make it such that we don't need the lock eventually - func completeAssociation(handshake: HandshakeStateMachine.CompletedState, over channel: Channel) { + func completeAssociation(handshake: HandshakeStateMachine.CompletedState, over channel: Channel) throws { assert( self.remoteNode == handshake.remoteNode, """ @@ -75,7 +76,7 @@ final class Association: CustomStringConvertible { """ ) - self.lock.withLockVoid { + try self.lock.withLockVoid { switch self.state { case .associating(let sendQueue): // 1) we need to flush all the queued up messages @@ -93,10 +94,14 @@ final class Association: CustomStringConvertible { self.state = .associated(channel: channel) case .associated: - _ = channel.close() // TODO: throw instead of accepting a "double complete"? + let desc = "\(channel)" + _ = channel.close() + throw AssociationError.attemptToCompleteAlreadyCompletedAssociation(self, offendingChannelDescription: desc) case .tombstone: + let desc = "\(channel)" _ = channel.close() + throw AssociationError.attemptToCompleteTombstonedAssociation(self, offendingChannelDescription: desc) } } } @@ -218,6 +223,11 @@ extension Association { } } +enum AssociationError: Error { + case attemptToCompleteAlreadyCompletedAssociation(Association, offendingChannelDescription: String) + case attemptToCompleteTombstonedAssociation(Association, offendingChannelDescription: String) +} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Association Tombstone diff --git a/Sources/DistributedActors/Cluster/Cluster+Event.swift b/Sources/DistributedActors/Cluster/Cluster+Event.swift index 77596f192..2383b7d72 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Event.swift +++ b/Sources/DistributedActors/Cluster/Cluster+Event.swift @@ -106,6 +106,17 @@ extension Cluster { self.fromStatus = replaced.status self.toStatus = newMember.status } + + public func hash(into hasher: inout Hasher) { + self.member.hash(into: &hasher) + } + + public static func == (lhs: MembershipChange, rhs: MembershipChange) -> Bool { + lhs.member == rhs.member && + lhs.replaced == rhs.replaced && + lhs.fromStatus == rhs.fromStatus && + lhs.toStatus == rhs.toStatus + } } } @@ -129,9 +140,8 @@ extension Cluster.MembershipChange { self.toStatus.isDown } - /// Matches when a change is to: `.down`, `.leaving` or `.removed`. - public var isAtLeastDown: Bool { - self.toStatus >= .down + public func isAtLeast(_ status: Cluster.MemberStatus) -> Bool { + self.toStatus >= status } public var isLeaving: Bool { diff --git a/Sources/DistributedActors/Cluster/Cluster+Gossip.swift b/Sources/DistributedActors/Cluster/Cluster+Gossip.swift index 7fa821781..f5ca34e59 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Gossip.swift +++ b/Sources/DistributedActors/Cluster/Cluster+Gossip.swift @@ -83,7 +83,7 @@ extension Cluster { case .some(let locallyKnownMember) where locallyKnownMember.status.isDown: // we have NOT removed it yet, but it is down, so we ignore it return .init(causalRelation: causalRelation, effectiveChanges: []) - case .none where incomingOwnerMember.status.isAtLeastDown: + case .none where incomingOwnerMember.status.isAtLeast(.down): // we have likely removed it, and it is down anyway, so we ignore it completely return .init(causalRelation: causalRelation, effectiveChanges: []) default: @@ -169,6 +169,21 @@ extension Cluster { } } +extension Cluster.Gossip: GossipEnvelopeProtocol { + typealias Metadata = SeenTable + typealias Payload = Self + + var metadata: Metadata { + self.seen + } + + var payload: Payload { + self + } +} + +extension Cluster.Gossip: CustomPrettyStringConvertible {} + // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Cluster.Gossip.SeenTable @@ -290,10 +305,11 @@ extension Cluster.Gossip.SeenTable: CustomStringConvertible, CustomPrettyStringC "Cluster.Gossip.SeenTable(\(self.underlying))" } - public var prettyDescription: String { + public func prettyDescription(depth: Int) -> String { var s = "Cluster.Gossip.SeenTable(\n" - let entryHeadingPadding = String(repeating: " ", count: 4) - let entryPadding = String(repeating: " ", count: 4 * 2) + let entryHeadingPadding = String(repeating: " ", count: 4 * depth) + let entryPadding = String(repeating: " ", count: 4 * (depth + 1)) + underlying.sorted(by: { $0.key < $1.key }).forEach { node, vv in let entryHeader = "\(entryHeadingPadding)\(node) observed versions:\n" diff --git a/Sources/DistributedActors/Cluster/Cluster+Member.swift b/Sources/DistributedActors/Cluster/Cluster+Member.swift index bfa1b1d98..f333ab25a 100644 --- a/Sources/DistributedActors/Cluster/Cluster+Member.swift +++ b/Sources/DistributedActors/Cluster/Cluster+Member.swift @@ -234,9 +234,8 @@ extension Cluster.MemberStatus { self == .down } - /// Convenience function to check if a status is `.removed` or `.removed` - public var isAtLeastDown: Bool { - self >= .down + public func isAtLeast(_ status: Cluster.MemberStatus) -> Bool { + self >= status } /// Convenience function to check if a status is `.removed` diff --git a/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift b/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift new file mode 100644 index 000000000..6af79fdc6 --- /dev/null +++ b/Sources/DistributedActors/Cluster/Cluster+MembershipGossipLogic.swift @@ -0,0 +1,148 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Membership Gossip Logic + +final class MembershipGossipLogic: GossipLogic { + typealias Envelope = Cluster.Gossip + + private let context: Context + private lazy var localNode: UniqueNode = self.context.system.cluster.node + + private var latestGossip: Cluster.Gossip + private let notifyOnGossipRef: ActorRef + + init(_ context: Context, notifyOnGossipRef: ActorRef) { + self.context = context + self.notifyOnGossipRef = notifyOnGossipRef + self.latestGossip = .init(ownerNode: context.system.cluster.node) + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Spreading gossip + + // TODO: implement better, only peers which are "behind" + func selectPeers(peers: [AddressableActorRef]) -> [AddressableActorRef] { + // how many peers we select in each gossip round, + // we could for example be dynamic and notice if we have 10+ nodes, we pick 2 members to speed up the dissemination etc. + let n = 1 + + var selectedPeers: [AddressableActorRef] = [] + selectedPeers.reserveCapacity(n) + + for peer in peers.shuffled() + where selectedPeers.count < n && self.shouldGossipWith(peer) { + selectedPeers.append(peer) + } + + return selectedPeers + } + + func makePayload(target: AddressableActorRef) -> Cluster.Gossip? { + // today we don't trim payloads at all + self.latestGossip + } + + func receivePayloadACK(target: AddressableActorRef, confirmedDeliveryOf envelope: Cluster.Gossip) { + // nothing to do + } + + /// True if the peers is "behind" in terms of information it has "seen" (as determined by comparing our and its seen tables). + private func shouldGossipWith(_ peer: AddressableActorRef) -> Bool { + guard let remoteNode = peer.address.node else { + // targets should always be remote peers; one not having a node should not happen, let's ignore it as a gossip target + return false + } + +// guard let remoteSeenVersion = self.latestGossip.seen.version(at: remoteNode) else { + guard self.latestGossip.seen.version(at: remoteNode) != nil else { + // this peer has never seen any information from us, so we definitely want to push a gossip + return true + } + + // FIXME: this is longer than may be necessary, optimize some more + return true + + // TODO: optimize some more; but today we need to keep gossiping until all VVs are the same, because convergence depends on this +// switch self.latestGossip.seen.compareVersion(observedOn: self.localNode, to: remoteSeenVersion) { +// case .happenedBefore, .same: +// // we have strictly less information than the peer, no need to gossip to it +// return false +// case .concurrent, .happenedAfter: +// // we have strictly concurrent or more information the peer, gossip with it +// return true +// } + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Receiving gossip + + func receiveGossip(origin: AddressableActorRef, payload: Cluster.Gossip) { + self.mergeInbound(payload) + self.notifyOnGossipRef.tell(self.latestGossip) + } + + func localGossipUpdate(payload: Cluster.Gossip) { + self.mergeInbound(payload) + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Side-channel + + enum SideChannelMessage { + case localUpdate(Envelope) + } + + func receiveSideChannelMessage(message: Any) throws { + guard let sideChannelMessage = message as? SideChannelMessage else { + self.context.system.deadLetters.tell(DeadLetter(message, recipient: self.context.gossiperAddress)) + return + } + + switch sideChannelMessage { + case .localUpdate(let payload): + self.mergeInbound(payload) + } + } + + // ==== ------------------------------------------------------------------------------------------------------------ + // MARK: Utilities + + private func mergeInbound(_ incoming: Cluster.Gossip) { + _ = self.latestGossip.mergeForward(incoming: incoming) + // effects are signalled via the ClusterShell, not here (it will also perform a merge) // TODO: a bit duplicated, could we maintain it here? + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Membership Gossip Logic Control + +let MembershipGossipIdentifier: StringGossipIdentifier = "membership" + +extension GossipControl where GossipEnvelope == Cluster.Gossip { + func update(payload: GossipEnvelope) { + self.update(MembershipGossipIdentifier, payload: payload) + } + + func remove() { + self.remove(MembershipGossipIdentifier) + } + + func sideChannelTell(message: Any) { + self.sideChannelTell(MembershipGossipIdentifier, message: message) + } +} diff --git a/Sources/DistributedActors/Cluster/ClusterSettings.swift b/Sources/DistributedActors/Cluster/ClusterSettings.swift index 4df7994e0..fa9ebb2cc 100644 --- a/Sources/DistributedActors/Cluster/ClusterSettings.swift +++ b/Sources/DistributedActors/Cluster/ClusterSettings.swift @@ -81,9 +81,12 @@ public struct ClusterSettings { public var connectTimeout: TimeAmount = .milliseconds(500) /// Backoff to be applied when attempting a new connection and handshake with a remote system. - public var associationHandshakeBackoff: BackoffStrategy = Backoff.exponential(initialInterval: .milliseconds(100)) - - // public var associationHandshakeMaxAttempts: Int TODO: configure number of retries when connecting + public var handshakeReconnectBackoff: BackoffStrategy = Backoff.exponential( + initialInterval: .milliseconds(300), + multiplier: 1.5, + capInterval: .seconds(3), + maxAttempts: 32 + ) /// Defines the Time-To-Live of an association, i.e. when it shall be completely dropped from the tombstones list. /// An association ("unique connection identifier between two nodes") is kept as tombstone when severing a connection between nodes, @@ -99,13 +102,15 @@ public struct ClusterSettings { self._protocolVersion } - // Exposed for testing handshake negotiation while joining nodes of different versions + /// FOR TESTING ONLY: Exposed for testing handshake negotiation while joining nodes of different versions internal var _protocolVersion: DistributedActors.Version = DistributedActorsProtocolVersion // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Cluster.Membership Gossip - public var membershipGossipInterval: TimeAmount = .milliseconds(500) + public var membershipGossipInterval: TimeAmount = .seconds(1) + + public var membershipGossipIntervalRandomFactor: Double = 0.2 // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Leader Election diff --git a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift index d5e6917ab..60d26a966 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell+LeaderActions.swift @@ -48,8 +48,8 @@ extension ClusterShellState { } var leadershipActions: [LeaderAction] = [] - leadershipActions.append(contentsOf: collectMemberUpMoves()) - leadershipActions.append(contentsOf: collectDownMemberRemovals()) + leadershipActions += collectMemberUpMoves() + leadershipActions += collectDownMemberRemovals() return leadershipActions } diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index d5b58b2ce..86e8736d0 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -55,21 +55,13 @@ internal class ClusterShell { self._associationsLock.withLock { // TODO: a bit terrible; perhaps key should be Node and then confirm by UniqueNode? // this used to be separated in the State keeping them by Node and here we kept by unique though that caused other challenges - self._associations.first { - key, _ in key.node == node - }?.value - } - } - - internal func getSpecificExistingAssociation(with node: UniqueNode) -> Association? { - self._associationsLock.withLock { - self._associations[node] + self._associations.first { $0.key.node == node }?.value } } /// Get an existing association or ensure that a new one shall be stored and joining kicked off if the target node was not known yet. /// Safe to concurrently access by privileged internals directly - internal func getEnsureAssociation(with node: UniqueNode) -> StoredAssociationState { + internal func getEnsureAssociation(with node: UniqueNode, file: String = #file, line: UInt = #line) -> StoredAssociationState { self._associationsLock.withLock { if let tombstone = self._associationTombstones[node] { return .tombstone(tombstone) @@ -81,13 +73,44 @@ internal class ClusterShell { /// We're trying to send to `node` yet it has no association (not even in progress), /// thus we need to kick it off. Once it completes it will .completeAssociation() on the stored one (here in the field in Shell). - self.ref.tell(.command(.handshakeWith(node.node))) + self.ref.tell(.command(.handshakeWithSpecific(node))) return .association(association) } } } +// /// As a retry we strongly assume that the association already exists, if not, it has to be a tombstone +// /// +// /// We also increment the retry counter. +// /// - Returns: `nil` is already associated, so no reason to retry, otherwise the retry statistics +// internal func retryAssociation(with node: Node) -> Association.Retries? { +// self._associationsLock.withLock { +// // TODO: a bit terrible; perhaps key should be Node and then confirm by UniqueNode? +// // this used to be separated in the State keeping them by Node and here we kept by unique though that caused other challenges +// pprint("self._associations = \(self._associations)") +// let maybeAssociation = self._associations.first { $0.key.node == node }?.value +// +// guard let association = maybeAssociation else { +// return nil // weird, we always should have one since were RE-trying, but ok, let's simply give up. +// } +// +// if let retries = association.retryAssociating() { +// // TODO: sanity check locks and that we do count retries +// return retries +// } else { +// // no need to retry, seems it completed already! +// return nil +// } +// } +// } + + internal func getSpecificExistingAssociation(with node: UniqueNode) -> Association? { + self._associationsLock.withLock { + self._associations[node] + } + } + enum StoredAssociationState { /// An existing (ready or being associated association) which can be used to send (or buffer buffer until associated/terminated) case association(Association) @@ -97,15 +120,16 @@ internal class ClusterShell { /// To be invoked by cluster shell whenever handshake is accepted, creating a completed association. /// Causes messages to be flushed onto the new associated channel. - private func completeAssociation(_ associated: ClusterShellState.AssociatedDirective) { + private func completeAssociation(_ associated: ClusterShellState.AssociatedDirective, file: String = #file, line: UInt = #line) throws { // 1) Complete and store the association - self._associationsLock.withLockVoid { - let association = self._associations[associated.handshake.remoteNode] ?? - Association(selfNode: associated.handshake.localNode, remoteNode: associated.handshake.remoteNode) + try self._associationsLock.withLockVoid { + let node: UniqueNode = associated.handshake.remoteNode + let association = self._associations[node] ?? + Association(selfNode: associated.handshake.localNode, remoteNode: node) - association.completeAssociation(handshake: associated.handshake, over: associated.channel) + try association.completeAssociation(handshake: associated.handshake, over: associated.channel) - self._associations[associated.handshake.remoteNode] = association + self._associations[node] = association } // 2) Ensure the failure detector knows about this node @@ -300,6 +324,7 @@ internal class ClusterShell { /// /// If one is present, the underlying failure detector will be asked to monitor this node as well. case handshakeWith(Node) + case handshakeWithSpecific(UniqueNode) case retryHandshake(HandshakeStateMachine.InitiatedState) case failureDetectorReachabilityChanged(UniqueNode, Cluster.MemberReachability) @@ -333,12 +358,7 @@ internal class ClusterShell { // TODO: reformulate as Wire.accept / reject? internal enum HandshakeResult: Equatable, NonTransportableActorMessage { case success(UniqueNode) - case failure(HandshakeConnectionError) - } - - struct HandshakeConnectionError: Error, Equatable { - let node: Node // TODO: allow carrying UniqueNode - let message: String + case failure(HandshakeStateMachine.HandshakeConnectionError) } private var behavior: Behavior { @@ -402,12 +422,27 @@ extension ClusterShell { return context.awaitResultThrowing(of: chanElf, timeout: clusterSettings.bindTimeout) { (chan: Channel) in context.log.info("Bound to \(chan.localAddress.map { $0.description } ?? "")") - let gossipControl = try ConvergentGossip.start( - context, name: "\(ActorAddress._clusterGossip.name)", of: Cluster.Gossip.self, - notifyOnGossipRef: context.messageAdapter(from: Cluster.Gossip.self) { - Optional.some(Message.gossipFromGossiper($0)) - }, - props: ._wellKnown + // TODO: Membership.Gossip? + let gossipControl: GossipControl = try Gossiper.start( + context, + name: "\(ActorPath._clusterGossip.name)", + props: ._wellKnown, + settings: .init( + gossipInterval: clusterSettings.membershipGossipInterval, + gossipIntervalRandomFactor: clusterSettings.membershipGossipIntervalRandomFactor, + peerDiscovery: .onClusterMember(atLeast: .joining, resolve: { member in + let resolveContext = ResolveContext.Message>(address: ._clusterGossip(on: member.node), system: context.system) + return context.system._resolve(context: resolveContext).asAddressable() + }) + ), + makeLogic: { + MembershipGossipLogic( + $0, + notifyOnGossipRef: context.messageAdapter(from: Cluster.Gossip.self) { + Optional.some(Message.gossipFromGossiper($0)) + } + ) + } ) let state = ClusterShellState( @@ -421,8 +456,10 @@ extension ClusterShell { // loop through "self" cluster shell, which in result causes notifying all subscribers about cluster membership change var firstGossip = Cluster.Gossip(ownerNode: state.localNode) _ = firstGossip.membership.join(state.localNode) // change will be put into effect by receiving the "self gossip" - context.system.cluster.updateMembershipSnapshot(state.membership) firstGossip.incrementOwnerVersion() + context.system.cluster.updateMembershipSnapshot(state.membership) + + gossipControl.update(payload: firstGossip) // ???? context.myself.tell(.gossipFromGossiper(firstGossip)) // TODO: are we ok if we received another gossip first, not our own initial? should be just fine IMHO @@ -441,6 +478,8 @@ extension ClusterShell { switch command { case .handshakeWith(let node): return self.beginHandshake(context, state, with: node) + case .handshakeWithSpecific(let uniqueNode): + return self.beginHandshake(context, state, with: uniqueNode.node) case .retryHandshake(let initiated): return self.retryHandshake(context, state, initiated: initiated) @@ -547,9 +586,9 @@ extension ClusterShell { "membership/changes": Logger.MetadataValue.array(mergeDirective.effectiveChanges.map { Logger.MetadataValue.stringConvertible($0) }), - "gossip/incoming": "\(gossip)", - "gossip/before": "\(beforeGossipMerge)", - "gossip/now": "\(state.latestGossip)", + "gossip/incoming": "\(pretty: gossip)", + "gossip/before": "\(pretty: beforeGossipMerge)", + "gossip/now": "\(pretty: state.latestGossip)", ] ) @@ -587,7 +626,7 @@ extension ClusterShell { } } - func tryIntroduceGossipPeer(_ context: ActorContext, _ state: ClusterShellState, change: Cluster.MembershipChange, file: String = #file, line: UInt = #line) { + func tryIntroduceGossipPeer(_ context: ActorContext, _ state: ClusterShellState, change: Cluster.MembershipChange) { guard change.toStatus < .down else { return } @@ -597,13 +636,12 @@ extension ClusterShell { // TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm // TODO: consider receptionist instead of this; we're "early" but receptionist could already be spreading its info to this node, since we associated. - let gossipPeer: ConvergentGossip.Ref = context.system._resolve( + let gossipPeer: GossipShell.Ref = context.system._resolve( context: .init(address: ._clusterGossip(on: change.member.node), system: context.system) ) // FIXME: make sure that if the peer terminated, we don't add it again in here, receptionist would be better then to power this... // today it can happen that a node goes down but we dont know yet so we add it again :O state.gossipControl.introduce(peer: gossipPeer) - // state.gossipControl.introduce(ClusterShell.gossipID, peer: gossipPeer) } } @@ -628,22 +666,13 @@ extension ClusterShell { state.log.debug("Association already allocated for remote: \(reflecting: remoteNode), existing association: [\(existingAssociation)]") switch existingAssociation.state { case .associating: + // continue, we may be the first beginHandshake (as associations may be ensured outside of actor context) () - // continue, we may be the first beginHandshake (as associations may be ensured outside of actor context) - //// existingAssociation.enqueueCompletionTask { -// replyTo?.tell(.success(state.localNode)) -// } case .associated: - // return , we've been successful already -// replyTo?.tell(.success(existingAssociation.remoteNode)) + // nothing to do, we already associated return .same case .tombstone: -// replyTo?.tell(.failure( -// HandshakeConnectionError( -// node: existingAssociation.remoteNode.node, -// message: "Existing association for \(existingAssociation.remoteNode) is already a tombstone! Must not complete association." -// ) -// )) + // TODO: sanity check if this isn't about handshaking with a replacement, then we should continue; return .same } } @@ -654,7 +683,9 @@ extension ClusterShell { switch handshakeState { case .initiated(let initiated): - state.log.debug("Initiated handshake: \(initiated)") + state.log.debug("Initiated handshake: \(initiated)", metadata: [ + "cluster/associatedNodes": "\(self._associatedNodes())", + ]) return self.connectSendHandshakeOffer(context, state, initiated: initiated) case .wasOfferedHandshake, .inFlight, .completed: @@ -667,15 +698,18 @@ extension ClusterShell { internal func retryHandshake(_ context: ActorContext, _ state: ClusterShellState, initiated: HandshakeStateMachine.InitiatedState) -> Behavior { state.log.debug("Retry handshake with: \(initiated.remoteNode)") - - // TODO: update retry counter, perhaps give up +// +// // FIXME: this needs more work... +// let assoc = self.getRetryAssociation(with: initiated.remoteNode) return self.connectSendHandshakeOffer(context, state, initiated: initiated) } func connectSendHandshakeOffer(_ context: ActorContext, _ state: ClusterShellState, initiated: HandshakeStateMachine.InitiatedState) -> Behavior { var state = state - state.log.debug("Extending handshake offer to \(initiated.remoteNode))") // TODO: log retry stats? + state.log.debug("Extending handshake offer", metadata: [ + "handshake/remoteNode": "\(initiated.remoteNode)", + ]) let offer: Wire.HandshakeOffer = initiated.makeOffer() self.tracelog(context, .send(to: initiated.remoteNode), message: offer) @@ -689,8 +723,7 @@ extension ClusterShell { serializationPool: self.serializationPool ) - // the timeout is being handled by the `connectTimeout` socket option - // in NIO, so it is safe to use an infinite timeout here + // the timeout is being handled by the `connectTimeout` socket option in NIO, so it is safe to use an infinite timeout here return context.awaitResult(of: outboundChanElf, timeout: .effectivelyInfinite) { result in switch result { case .success(let chan): @@ -707,36 +740,90 @@ extension ClusterShell { // MARK: Incoming Handshake extension ClusterShell { + func rejectIfNodeAlreadyLeaving( + _ context: ActorContext, + _ state: ClusterShellState, + _ offer: Wire.HandshakeOffer + ) -> Wire.HandshakeReject? { + guard let member = state.localMember else { + // no local member? this is bad + state.log.warning( + """ + Received handshake while no local Cluster.Member available, this may indicate that we were removed form the cluster. + Rejecting handshake + """) + return .init( + version: state.settings.protocolVersion, + targetNode: state.localNode, + originNode: offer.originNode, + reason: "Node cannot be part of cluster, no member available.", + whenHandshakeReplySent: nil + ) + } + + if member.status.isAtLeast(.leaving) { + state.log.notice("Received handshake while already [\(member.status)]") + + return .init( + version: state.settings.protocolVersion, + targetNode: state.localNode, + originNode: offer.originNode, + reason: "Node already leaving cluster.", + whenHandshakeReplySent: nil + ) + } + + // let's try that to make that handshake + return nil + } + /// Initial entry point for accepting a new connection; Potentially allocates new handshake state machine. /// - parameter inboundChannel: the inbound connection channel that the other node has opened and is offering its handshake on, /// (as opposed to the channel which we may have opened when we first extended a handshake to that node which would be stored in `state`) internal func onHandshakeOffer( _ context: ActorContext, _ state: ClusterShellState, _ offer: Wire.HandshakeOffer, inboundChannel: Channel, - replyInto promise: EventLoopPromise + replyInto handshakePromise: EventLoopPromise ) -> Behavior { var state = state - switch state.onIncomingHandshakeOffer(offer: offer, incomingChannel: inboundChannel) { + // TODO: guard that the target node is actually "us"? i.e. if we're exposed over various protocols and/or ports etc? + if let rejection = self.rejectIfNodeAlreadyLeaving(context, state, offer) { + handshakePromise.succeed(.reject(rejection)) + return .same + } + + // if there already is an existing association, we'll bail out and abort this "new" connection; there must only ever be one association + let maybeExistingAssociation: Association? = self.getSpecificExistingAssociation(with: offer.originNode) + + switch state.onIncomingHandshakeOffer(offer: offer, existingAssociation: maybeExistingAssociation, incomingChannel: inboundChannel) { case .negotiateIncoming(let hsm): - // handshake is allowed to proceed + // 0) ensure, since it seems we're indeed going to negotiate it; + // otherwise another actor or something else could kick off the negotiation and we'd become the initiating (offering the handshake), + // needlessly causing the "both nodes racing the handshake offer" situation, which will be resolved, but there's no need for rhat race here, + // we'll simply accept (or not) the incoming offer. + _ = self.getEnsureAssociation(with: offer.originNode) + + // 1) handshake is allowed to proceed switch hsm.negotiate() { case .acceptAndAssociate(let handshakeCompleted): state.log.trace("Accept handshake with \(reflecting: offer.originNode)!", metadata: [ "handshake/channel": "\(inboundChannel)", ]) - // accept handshake and store completed association - let directive = state.completeHandshakeAssociate(self, handshakeCompleted, channel: inboundChannel) + // 1.1) we're accepting; prepare accept + let accept = handshakeCompleted.makeAccept(whenHandshakeReplySent: nil) - // prepare accept - let accept = handshakeCompleted.makeAccept(whenHandshakeReplySent: { () in - self.completeAssociation(directive) - state.log.trace("Associated with: \(reflecting: handshakeCompleted.remoteNode)", metadata: [ - "membership/change": "\(directive.membershipChange)", - "membership": "\(state.membership)", - ]) - }) + // 2) Only now we can succeed the accept promise (as the old one has been terminated and cleared) + self.tracelog(context, .send(to: offer.originNode.node), message: accept) + handshakePromise.succeed(.accept(accept)) + + // 3) Complete and store the association, we are now ready to flush writes onto the network + // + // it is VERY important that we do so BEFORE we emit any cluster events, since then actors are free to + // talk to other actors on the (now associated node) and if there is no `.associating` association yet + // their communication attempts could kick off a handshake attempt; there is no need for this, since we're already accepting here. + let directive = state.completeHandshakeAssociate(self, handshakeCompleted, channel: inboundChannel) // This association may mean that we've "replaced" a previously known node of the same host:port, // In case of such replacement we must down and terminate the association of the previous node. @@ -744,15 +831,27 @@ extension ClusterShell { // This MUST be called before we complete the new association as it may need to terminate the old one. self.handlePotentialAssociatedMemberReplacement(directive: directive, accept: accept, context: context, state: &state) - // Only now we can succeed the accept promise (as the old one has been terminated and cleared) - self.tracelog(context, .send(to: offer.originNode.node), message: accept) - promise.succeed(.accept(accept)) + do { + try self.completeAssociation(directive) + state.log.trace("Associated with: \(reflecting: handshakeCompleted.remoteNode)", metadata: [ + "membership/change": "\(optional: directive.membershipChange)", + "membership": "\(state.membership)", + ]) + } catch { + state.log.warning("Error while trying to complete association with: \(reflecting: handshakeCompleted.remoteNode), error: \(error)", metadata: [ + "membership/change": "\(optional: directive.membershipChange)", + "membership": "\(state.membership)", + "association/error": "\(error)", + ]) + } + // 4) Emit cluster events (i.e. .join the new member) // publish any cluster events this association caused. // As the new association is stored, any reactions to these events will use the right underlying connection - state.events.publish(.membershipChange(directive.membershipChange)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? - - self.tryIntroduceGossipPeer(context, state, change: directive.membershipChange) + if let change = directive.membershipChange { + state.events.publish(.membershipChange(change)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? + self.tryIntroduceGossipPeer(context, state, change: change) + } /// a new node joined, thus if we are the leader, we should perform leader tasks to potentially move it to .up let actions = state.collectLeaderActions() @@ -772,22 +871,14 @@ extension ClusterShell { self.terminateAssociation(context.system, state: &state, rejectedHandshake.remoteNode) }) self.tracelog(context, .send(to: offer.originNode.node), message: reject) - promise.succeed(.reject(reject)) + handshakePromise.succeed(.reject(reject)) return self.ready(state: state) } - case .abortIncomingDueToConcurrentHandshake: - // concurrent handshake and we should abort - let error = HandshakeConnectionError( - node: offer.originNode.node, - message: """ - Terminating this connection, as there is a concurrently established connection with same host [\(offer.originNode)] \ - which will be used to complete the handshake. - """ - ) - promise.fail(error) - + case .abortIncomingHandshake(let error): + state.log.warning("Aborting incoming handshake: \(error)") // TODO: remove + handshakePromise.fail(error) state.closeHandshakeChannel(offer: offer, channel: inboundChannel) return .same } @@ -800,35 +891,60 @@ extension ClusterShell { extension ClusterShell { func onOutboundConnectionError(_ context: ActorContext, _ state: ClusterShellState, with remoteNode: Node, error: Error) -> Behavior { var state = state - state.log.warning("Failed await for outbound channel to \(remoteNode); Error was: \(error)") + state.log.debug("Failed to establish outbound channel to \(remoteNode), error: \(error)", metadata: [ + "handshake/remoteNode": "\(remoteNode)", + "handshake/error": "\(error)", + ]) guard let handshakeState = state.handshakeInProgress(with: remoteNode) else { - state.log.warning("Connection error for handshake which is not in progress, this should not happen, but is harmless.") // TODO: meh or fail hard + state.log.warning("Connection error for handshake which is not in progress, this should not happen, but is harmless.", metadata: [ + "handshake/remoteNode": "\(remoteNode)", + "handshake/error": "\(error)", + ]) return .same } switch handshakeState { case .initiated(var initiated): - switch initiated.onHandshakeError(error) { - case .scheduleRetryHandshake(let delay): - state.log.debug("Schedule handshake retry to: [\(initiated.remoteNode)] delay: [\(delay)]") + guard initiated.channel == nil else { + fatalError("Seems we DO have a channel already! \(initiated)\n \(state)") + } + + switch initiated.onConnectionError(error) { + case .scheduleRetryHandshake(let retryDelay): + state.log.debug("Schedule handshake retry", metadata: [ + "handshake/remoteNote": "\(initiated.remoteNode)", + "handshake/retryDelay": "\(retryDelay)", + ]) context.timers.startSingle( key: TimerKey("handshake-timer-\(remoteNode)"), message: .command(.retryHandshake(initiated)), - delay: delay + delay: retryDelay ) + + // ensure we store the updated state; since retry attempts modify the backoff state + state._handshakes[remoteNode] = .initiated(initiated) + case .giveUpOnHandshake: if let hsmState = state.closeOutboundHandshakeChannel(with: remoteNode) { - state.log.warning("Giving up on handshake: \(hsmState)") + state.log.warning("Giving up on handshake with node [\(remoteNode)]", metadata: [ + "handshake/error": "\(error)", + "handshake/state": "\(hsmState)", + ]) } } case .wasOfferedHandshake(let state): preconditionFailure("Outbound connection error should never happen on receiving end. State was: [\(state)], error was: \(error)") - case .completed(let state): - preconditionFailure("Outbound connection error on already completed state handshake. This should not happen. State was: [\(state)], error was: \(error)") + case .completed(let completedState): + // this could mean that another (perhaps inbound, rather than the outbound handshake we're attempting here) actually succeeded + state.log.notice("Stored handshake state is .completed, while outbound connection establishment failed. Assuming existing completed association is correct.", metadata: [ + "handshake/error": "\(error)", + "handshake/state": "\(state)", + "handshake/completed": "\(completedState)", + ]) case .inFlight: - preconditionFailure("An in-flight marker state should never be stored, yet was encountered in \(#function)") + preconditionFailure("An in-flight marker state should never be stored, yet was encountered in \(#function). State was: [\(state)], error was: \(error)") } return self.ready(state: state) @@ -863,21 +979,32 @@ extension ClusterShell { // 1.1) This association may mean that we've "replaced" a previously known node of the same host:port, // In case of such replacement we must down and terminate the association of the previous node. - // // This MUST be called before we complete the new association as it may need to terminate the old one. + // + // This MUST be called before we complete the new association as it may need to terminate the old one. + // This MAY emit a .down event if there is a node being replaced; this is ok but MUST happen before we issue the new .joining change for the replacement self.handlePotentialAssociatedMemberReplacement(directive: directive, accept: inboundAccept, context: context, state: &state) // 2) Store the (now completed) association first, as it may be immediately used by remote ActorRefs attempting to send to the remoteNode - self.completeAssociation(directive) - state.log.debug("Associated with: \(reflecting: handshakeCompleted.remoteNode)", metadata: [ - "membership/change": "\(directive.membershipChange)", - "membership": "\(state.membership)", - ]) + do { + try self.completeAssociation(directive) + state.log.trace("Associated with: \(reflecting: handshakeCompleted.remoteNode)", metadata: [ + "membership/change": "\(optional: directive.membershipChange)", + "membership": "\(state.membership)", + ]) + } catch { + state.log.warning("Error while trying to complete association with: \(reflecting: handshakeCompleted.remoteNode), error: \(error)", metadata: [ + "membership/change": "\(optional: directive.membershipChange)", + "membership": "\(state.membership)", + "association/error": "\(error)", + ]) + } // 3) publish any cluster events this association caused. // As the new association is stored, any reactions to these events will use the right underlying connection - state.events.publish(.membershipChange(directive.membershipChange)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? - - self.tryIntroduceGossipPeer(context, state, change: directive.membershipChange) + if let change = directive.membershipChange { + state.events.publish(.membershipChange(change)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? + self.tryIntroduceGossipPeer(context, state, change: change) + } // 4) Since a new node joined, if we are the leader, we should perform leader tasks to potentially move it to .up let actions = state.collectLeaderActions() @@ -896,7 +1023,7 @@ extension ClusterShell { context: ActorContext, state: inout ClusterShellState ) { - if let replacedMember = directive.membershipChange.replaced { + if let replacedMember = directive.membershipChange?.replaced { // the change was a replacement and thus we need to down the old member (same host:port as the new one), // and terminate its association. @@ -1084,7 +1211,10 @@ extension ClusterShell { self.clusterEvents.publish(.membershipChange(change)) if let logChangeLevel = state.settings.logMembershipChanges { - context.log.log(level: logChangeLevel, "Cluster membership change: \(reflecting: change), membership: \(state.membership)") + context.log.log(level: logChangeLevel, "Cluster membership change: \(reflecting: change)", metadata: [ + "cluster/membership/change": "\(change)", + "cluster/membership": "\(state.membership)", + ]) } } @@ -1093,8 +1223,8 @@ extension ClusterShell { // Down(self node); ensuring SWIM knows about this and should likely initiate graceful shutdown context.log.warning( "Self node was marked [.down]!", - metadata: [ // TODO: carry reason why -- was it gossip, manual or other - "cluster/membership": "\(state.membership)", // TODO: introduce state.metadata pattern? + metadata: [ // TODO: carry reason why -- was it gossip, manual or other? + "cluster/membership": "\(state.membership)", ] ) diff --git a/Sources/DistributedActors/Cluster/ClusterShellState.swift b/Sources/DistributedActors/Cluster/ClusterShellState.swift index 186c6767e..6db8de998 100644 --- a/Sources/DistributedActors/Cluster/ClusterShellState.swift +++ b/Sources/DistributedActors/Cluster/ClusterShellState.swift @@ -29,6 +29,7 @@ internal protocol ReadOnlyClusterState { /// Unique address of the current node. var localNode: UniqueNode { get } + var localMember: Cluster.Member? { get } // TODO: enforce that we always have the localMember var settings: ClusterSettings { get } } @@ -42,21 +43,24 @@ internal struct ClusterShellState: ReadOnlyClusterState { let events: EventStream - let localNode: UniqueNode let channel: Channel + let localNode: UniqueNode + var localMember: Cluster.Member? { + self.membership.uniqueMember(self.localNode) + } + let eventLoopGroup: EventLoopGroup var handshakeBackoff: BackoffStrategy { - self.settings.associationHandshakeBackoff + self.settings.handshakeReconnectBackoff } let allocator: ByteBufferAllocator internal var _handshakes: [Node: HandshakeStateMachine.State] = [:] -// let gossipControl: GossipControl - let gossipControl: ConvergentGossipControl + let gossipControl: GossipControl /// Updating the `latestGossip` causes the gossiper to be informed about it, such that the next time it does a gossip round /// it uses the latest gossip available. @@ -93,7 +97,13 @@ internal struct ClusterShellState: ReadOnlyClusterState { } } - init(settings: ClusterSettings, channel: Channel, events: EventStream, gossipControl: ConvergentGossipControl, log: Logger) { + init( + settings: ClusterSettings, + channel: Channel, + events: EventStream, + gossipControl: GossipControl, + log: Logger + ) { self.log = log self.settings = settings self.allocator = settings.allocator @@ -162,7 +172,7 @@ extension ClusterShellState { #endif var initiated = initiated - initiated.onChannelConnected(channel: channel) + initiated.onConnectionEstablished(channel: channel) self._handshakes[initiated.remoteNode] = .initiated(initiated) return self @@ -234,13 +244,21 @@ extension ClusterShellState { /// Upon tie-break the nodes follow these two roles: /// Winner: Keeps the outgoing connection, negotiates and replies accept/reject on the "incoming" connection from the remote node. /// Loser: Drops the incoming connection and waits for Winner's decision. - mutating func onIncomingHandshakeOffer(offer: Wire.HandshakeOffer, incomingChannel: Channel) -> OnIncomingHandshakeOfferDirective { + mutating func onIncomingHandshakeOffer(offer: Wire.HandshakeOffer, existingAssociation: Association?, incomingChannel: Channel) -> OnIncomingHandshakeOfferDirective { func prepareNegotiation0() -> OnIncomingHandshakeOfferDirective { let fsm = HandshakeStateMachine.HandshakeOfferReceivedState(state: self, offer: offer) self._handshakes[offer.originNode.node] = .wasOfferedHandshake(fsm) return .negotiateIncoming(fsm) } + guard existingAssociation == nil else { + let error = HandshakeStateMachine.HandshakeConnectionError( + node: offer.originNode.node, + message: "Terminating this connection, the node [\(offer.originNode)] is already associated. Possibly a delayed handshake retry message was delivered?" + ) + return .abortIncomingHandshake(error) + } + guard let inProgress = self._handshakes[offer.originNode.node] else { // no other concurrent handshakes in progress; good, this is happy path, so we simply continue our negotiation return prepareNegotiation0() @@ -280,7 +298,15 @@ extension ClusterShellState { } else { // we "lost", the other node will send the accept; when it does, the will complete the future. - return .abortIncomingDueToConcurrentHandshake + // concurrent handshake and we should abort + let error = HandshakeStateMachine.HandshakeConnectionError( + node: offer.originNode.node, + message: """ + Terminating this connection, as there is a concurrently established connection with same host [\(offer.originNode)] \ + which will be used to complete the handshake. + """ + ) + return .abortIncomingHandshake(error) } case .wasOfferedHandshake: // suspicious but but not wrong, so we were offered before, and now are being offered again? @@ -301,7 +327,7 @@ extension ClusterShellState { case negotiateIncoming(HandshakeStateMachine.HandshakeOfferReceivedState) /// An existing handshake with given peer is already in progress, /// do not negotiate but rest assured that the association will be handled properly by the already ongoing process. - case abortIncomingDueToConcurrentHandshake + case abortIncomingHandshake(HandshakeStateMachine.HandshakeConnectionError) } mutating func incomingHandshakeAccept(_ accept: Wire.HandshakeAccept) -> HandshakeStateMachine.CompletedState? { @@ -334,26 +360,17 @@ extension ClusterShellState { /// Stores an `Association` for the newly established association; /// /// Does NOT by itself move the member to joining, but via the directive asks the outer to do this. - mutating func completeHandshakeAssociate(_ clusterShell: ClusterShell, _ handshake: HandshakeStateMachine.CompletedState, channel: Channel) -> AssociatedDirective { + mutating func completeHandshakeAssociate( + _ clusterShell: ClusterShell, _ handshake: HandshakeStateMachine.CompletedState, channel: Channel, + file: String = #file, line: UInt = #line + ) -> AssociatedDirective { guard self._handshakes.removeValue(forKey: handshake.remoteNode.node) != nil else { fatalError("Can not complete a handshake which was not in progress!") // TODO: perhaps we instead just warn and ignore this; since it should be harmless } - // Membership may not know about the remote node yet, i.e. it reached out directly to this node; - // In that case, the join will return a change; Though if the node is already known, e.g. we were told about it - // via gossip from other nodes, though didn't yet complete associating until just now, so we can make a `change` - // based on the stored member - let changeOption: Cluster.MembershipChange? = self.membership.applyMembershipChange(.init(member: .init(node: handshake.remoteNode, status: .joining))) ?? - self.membership.uniqueMember(handshake.remoteNode).map { Cluster.MembershipChange(member: $0) } - - guard let change = changeOption else { - fatalError(""" - Attempt to associate with \(reflecting: handshake.remoteNode) failed; It was neither a new node .joining, \ - nor was it a node that we already know. This should never happen as one of those two cases is always true. \ - Please report a bug. - """) - } + let change: Cluster.MembershipChange? = + self.membership.applyMembershipChange(Cluster.MembershipChange(member: .init(node: handshake.remoteNode, status: .joining))) return AssociatedDirective( membershipChange: change, @@ -363,8 +380,8 @@ extension ClusterShellState { } struct AssociatedDirective { - let membershipChange: Cluster.MembershipChange - // let association: Association + let membershipChange: Cluster.MembershipChange? +// let membershipChange: Cluster.MembershipChange let handshake: HandshakeStateMachine.CompletedState let channel: Channel } diff --git a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift index 1e262afdc..131368293 100644 --- a/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift +++ b/Sources/DistributedActors/Cluster/Downing/TimeoutBasedDowningStrategy.swift @@ -52,7 +52,7 @@ public final class TimeoutBasedDowningStrategy: DowningStrategy { return .none } - if change.isAtLeastDown { + if change.isAtLeast(.down) { // it was marked as down by someone, we don't need to track it anymore _ = self._markAsDown.remove(change.member) _ = self._unreachable.remove(change.member) diff --git a/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift b/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift index 0d70bbf1f..ceb24874f 100644 --- a/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift +++ b/Sources/DistributedActors/Cluster/HandshakeStateMachine.swift @@ -61,7 +61,6 @@ internal struct HandshakeStateMachine { // MARK: Handshake Initiated internal struct InitiatedState: Swift.CustomStringConvertible { - var backoff: BackoffStrategy let settings: ClusterSettings var protocolVersion: Version { @@ -71,22 +70,22 @@ internal struct HandshakeStateMachine { let remoteNode: Node let localNode: UniqueNode + var handshakeReconnectBackoff: BackoffStrategy + /// Channel which was established when we initiated the handshake (outgoing connection), /// which may want to be closed when we `abortHandshake` and want to kill the related outgoing connection as we do so. /// /// This is ALWAYS set once the initial clientBootstrap has completed. var channel: Channel? - // TODO: counter for how many times to retry associating (timeouts) - init( settings: ClusterSettings, localNode: UniqueNode, connectTo remoteNode: Node ) { precondition(localNode.node != remoteNode, "MUST NOT attempt connecting to own bind address. Address: \(remoteNode)") self.settings = settings - self.backoff = settings.associationHandshakeBackoff self.localNode = localNode self.remoteNode = remoteNode + self.handshakeReconnectBackoff = settings.handshakeReconnectBackoff // copy since we want to mutate it as the handshakes attempt retries } func makeOffer() -> Wire.HandshakeOffer { @@ -94,23 +93,20 @@ internal struct HandshakeStateMachine { Wire.HandshakeOffer(version: self.protocolVersion, originNode: self.localNode, targetNode: self.remoteNode) } - mutating func onHandshakeTimeout() -> RetryDirective { - if let interval = self.backoff.next() { - return .scheduleRetryHandshake(delay: interval) - } else { - return .giveUpOnHandshake - } + mutating func onConnectionEstablished(channel: Channel) { + self.channel = channel } - mutating func onChannelConnected(channel: Channel) { - self.channel = channel + // TODO: call into an connection error? + // TODO: the remote REJECTING must not trigger backoffs + mutating func onHandshakeTimeout() -> RetryDirective { + self.onConnectionError(HandshakeConnectionError(node: self.remoteNode, message: "Handshake timed out")) // TODO: improve msgs } - mutating func onHandshakeError(_: Error) -> RetryDirective { - switch self.backoff.next() { - case .some(let amount): - return .scheduleRetryHandshake(delay: amount) - case .none: + mutating func onConnectionError(_: Error) -> RetryDirective { + if let nextConnectionAttemptDelay = self.handshakeReconnectBackoff.next() { + return .scheduleRetryHandshake(delay: nextConnectionAttemptDelay) + } else { return .giveUpOnHandshake } } @@ -120,14 +116,18 @@ internal struct HandshakeStateMachine { InitiatedState(\ remoteNode: \(self.remoteNode), \ localNode: \(self.localNode), \ - backoff: \(self.backoff), \ channel: \(optional: self.channel)\ ) """ } } -// // ==== ------------------------------------------------------------------------------------------------------------ + struct HandshakeConnectionError: Error, Equatable { + let node: Node // TODO: allow carrying UniqueNode + let message: String + } + + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Handshake In-Flight, and should attach to existing negotiation internal struct InFlightState { @@ -230,7 +230,7 @@ internal struct HandshakeStateMachine { // self.whenCompleted = received.whenCompleted } - func makeAccept(whenHandshakeReplySent: @escaping () -> Void) -> Wire.HandshakeAccept { + func makeAccept(whenHandshakeReplySent: (() -> Void)?) -> Wire.HandshakeAccept { Wire.HandshakeAccept( version: self.protocolVersion, targetNode: self.localNode, @@ -265,6 +265,7 @@ internal struct HandshakeStateMachine { } internal enum State { + /// Stored the moment a handshake is initiated; may not yet have an underlying connection yet. case initiated(InitiatedState) /// A handshake is already in-flight and was initiated by someone else previously. /// rather than creating another handshake dance, we will be notified along with the already initiated diff --git a/Sources/DistributedActors/Cluster/Leadership.swift b/Sources/DistributedActors/Cluster/Leadership.swift index e75120548..0825533d8 100644 --- a/Sources/DistributedActors/Cluster/Leadership.swift +++ b/Sources/DistributedActors/Cluster/Leadership.swift @@ -111,7 +111,7 @@ extension Leadership { var behavior: Behavior { .setup { context in - context.log.trace("Spawned \(context.path) to run \(self.election)") + context.log.trace("Configured with \(self.election)") context.system.cluster.events.subscribe(context.myself) // FIXME: we have to add "own node" since we're not getting the .snapshot... so we have to manually act as if.. diff --git a/Sources/DistributedActors/Cluster/NodeDeathWatcher.swift b/Sources/DistributedActors/Cluster/NodeDeathWatcher.swift index 5fdc332f4..91d722707 100644 --- a/Sources/DistributedActors/Cluster/NodeDeathWatcher.swift +++ b/Sources/DistributedActors/Cluster/NodeDeathWatcher.swift @@ -135,7 +135,7 @@ enum NodeDeathWatcherShell { context.system.cluster.events.subscribe(context.subReceive(Cluster.Event.self) { event in switch event { - case .membershipChange(let change) where change.isAtLeastDown: + case .membershipChange(let change) where change.isAtLeast(.down): instance.handleAddressDown(change) default: () // ignore other changes, we only need to react on nodes becoming DOWN diff --git a/Sources/DistributedActors/Cluster/Reception/OperationLogClusterReceptionist.swift b/Sources/DistributedActors/Cluster/Reception/OperationLogClusterReceptionist.swift index db1bb6eca..515d6a1b9 100644 --- a/Sources/DistributedActors/Cluster/Reception/OperationLogClusterReceptionist.swift +++ b/Sources/DistributedActors/Cluster/Reception/OperationLogClusterReceptionist.swift @@ -265,7 +265,7 @@ public class OperationLogClusterReceptionist { } return .same }.receiveSpecificSignal(Signals.Terminated.self) { _, terminated in - context.log.warning("Terminated: \(terminated)") + context.log.debug("Remote receptionist terminated: \(terminated)") if let node = terminated.address.node, terminated.address == ActorAddress._receptionist(on: node) { // receptionist terminated, need to prune it @@ -665,7 +665,7 @@ extension OperationLogClusterReceptionist { if effectiveChange.fromStatus == nil { // a new member joined, let's store and contact its receptionist self.onNewClusterMember(context, change: effectiveChange) - } else if effectiveChange.toStatus.isAtLeastDown { + } else if effectiveChange.toStatus.isAtLeast(.down) { // a member was removed, we should prune it from our observations self.pruneClusterMember(context, removedNode: effectiveChange.node) } diff --git a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift index 7935109fc..e770ed14e 100644 --- a/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift +++ b/Sources/DistributedActors/Cluster/SWIM/SWIMInstance.swift @@ -615,7 +615,7 @@ extension SWIM.Instance { case .ignoredDueToOlderStatus(let currentStatus): return .ignored( level: .trace, - message: "Ignoring gossip about member \(reflecting: member.node), incoming: [\(member.status)] does not supersede current: [\(currentStatus)]" + message: "Gossip about member \(reflecting: member.node), incoming: [\(member.status)] does not supersede current: [\(currentStatus)]" ) } } else if let remoteMemberNode = member.ref.address.node { diff --git a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift index 1b0b710de..395ff9b7f 100644 --- a/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift +++ b/Sources/DistributedActors/Cluster/Transport/RemoteClusterActorPersonality.swift @@ -97,7 +97,7 @@ public final class RemoteClusterActorPersonality { } private var association: ClusterShell.StoredAssociationState { - guard let remoteAddress = self.address.node else { + guard let uniqueNode = self.address.node else { fatalError("Attempted to access association remote control yet ref has no address! This should never happen and is a bug. The ref was: \(self)") } @@ -105,7 +105,7 @@ public final class RemoteClusterActorPersonality { // if let assoc = self._cachedAssociation.load() { return assoc } // else { get from shell and store here } - return self.clusterShell.getEnsureAssociation(with: remoteAddress) + return self.clusterShell.getEnsureAssociation(with: uniqueNode) } func _unsafeAssumeCast(to: NewMessage.Type) -> RemoteClusterActorPersonality { diff --git a/Sources/DistributedActors/Gossip/Gossip+Settings.swift b/Sources/DistributedActors/Gossip/Gossip+Settings.swift index ad6681b05..5c54939c6 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Settings.swift +++ b/Sources/DistributedActors/Gossip/Gossip+Settings.swift @@ -13,9 +13,9 @@ //===----------------------------------------------------------------------===// // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: ConvergentGossip Settings +// MARK: Gossiper Settings -extension GossipShell { +extension Gossiper { public struct Settings { /// Interval at which gossip rounds should proceed. public var gossipInterval: TimeAmount = .seconds(2) @@ -41,20 +41,20 @@ extension GossipShell { public var peerDiscovery: PeerDiscovery = .manuallyIntroduced public enum PeerDiscovery { - /// Automatically register this gossiper and subscribe for any others identifying under the same - /// `Receptionist.RegistrationKey.Message>(id)`. - case fromReceptionistListing(id: String) - -// /// Automatically discover and add cluster members to the gossip group when they become reachable in `atLeast` status. -// /// -// /// Note that by default `.leaving`, `.down` and `.removed` members are NOT added to the gossip group, -// /// even if they were never contacted by this gossiper before. -// case onClusterMember(atLeast: Cluster.MemberStatus, resolve: (Cluster.Member) -> AddressableActorRef) - /// Peers have to be manually introduced by calling `control.introduce()` on to the gossiper. /// This gives full control about when a peer should join the gossip group, however usually is not necessary /// as one can normally rely on the cluster events (e.g. a member becoming `.up`) to join the group which is case manuallyIntroduced + + /// Automatically register this gossiper and subscribe for any others identifying under the same + /// `Receptionist.RegistrationKey.Message>(id)`. + case fromReceptionistListing(id: String) + + /// Automatically discover and add cluster members to the gossip group when they become reachable in `atLeast` status. + /// + /// Note that by default `.leaving`, `.down` and `.removed` members are NOT added to the gossip group, + /// even if they were never contacted by this gossiper before. + case onClusterMember(atLeast: Cluster.MemberStatus, resolve: (Cluster.Member) -> AddressableActorRef) } } } diff --git a/Sources/DistributedActors/Pattern/ConvergentGossip+Serialization.swift b/Sources/DistributedActors/Gossip/Gossip+Shell+Coding.swift similarity index 54% rename from Sources/DistributedActors/Pattern/ConvergentGossip+Serialization.swift rename to Sources/DistributedActors/Gossip/Gossip+Shell+Coding.swift index eafa80910..9a62219d3 100644 --- a/Sources/DistributedActors/Pattern/ConvergentGossip+Serialization.swift +++ b/Sources/DistributedActors/Gossip/Gossip+Shell+Coding.swift @@ -12,34 +12,18 @@ // //===----------------------------------------------------------------------===// -extension ConvergentGossip.Message { - public enum DiscriminatorKeys: String, Codable { - case gossip - } - +extension StringGossipIdentifier: Codable { public enum CodingKeys: CodingKey { - case _case - - case gossip_envelope + case id } public init(from decoder: Decoder) throws { let container = try decoder.container(keyedBy: CodingKeys.self) - switch try container.decode(DiscriminatorKeys.self, forKey: ._case) { - case .gossip: - self = .gossip(try container.decode(ConvergentGossip.GossipEnvelope.self, forKey: .gossip_envelope)) - } + self.gossipIdentifier = try container.decode(String.self, forKey: .id) } public func encode(to encoder: Encoder) throws { var container = encoder.container(keyedBy: CodingKeys.self) - - switch self { - case .gossip(let envelope): - try container.encode(DiscriminatorKeys.gossip, forKey: ._case) - try container.encode(envelope, forKey: .gossip_envelope) - default: - throw SerializationError.nonTransportableMessage(type: "\(self)") - } + try container.encode(self.gossipIdentifier, forKey: .id) } } diff --git a/Sources/DistributedActors/Gossip/Gossip+Shell.swift b/Sources/DistributedActors/Gossip/Gossip+Shell.swift index 878f6f016..68525dbb1 100644 --- a/Sources/DistributedActors/Gossip/Gossip+Shell.swift +++ b/Sources/DistributedActors/Gossip/Gossip+Shell.swift @@ -14,9 +14,13 @@ import Logging +private let gossipTickKey: TimerKey = "gossip-tick" + /// Convergent gossip is a gossip mechanism which aims to equalize some state across all peers participating. internal final class GossipShell { - let settings: Settings + typealias Ref = ActorRef + + let settings: Gossiper.Settings private let makeLogic: (ActorContext, GossipIdentifier) -> AnyGossipLogic @@ -27,7 +31,7 @@ internal final class GossipShell { private var peers: Set fileprivate init( - settings: Settings, + settings: Gossiper.Settings, makeLogic: @escaping (Logic.Context) -> Logic ) where Logic: GossipLogic, Logic.Envelope == Envelope { self.settings = settings @@ -42,8 +46,8 @@ internal final class GossipShell { var behavior: Behavior { .setup { context in - self.ensureNextGossipRound(context: context) - self.initPeerDiscovery(context, settings: self.settings) + self.ensureNextGossipRound(context) + self.initPeerDiscovery(context) return Behavior.receiveMessage { switch $0 { @@ -64,9 +68,6 @@ internal final class GossipShell { case .gossip(let identity, let origin, let payload, let ackRef): self.receiveGossip(context, identifier: identity, origin: origin, payload: payload, ackRef: ackRef) - case ._clusterEvent: - fatalError("automatic peer location is not implemented") // FIXME: implement this https://github.com/apple/swift-distributed-actors/issues/371 - case ._periodicGossipTick: self.runGossipRound(context) } @@ -76,9 +77,10 @@ internal final class GossipShell { self.peers = self.peers.filter { $0.address != terminated.address } - // if self.peers.isEmpty { - // TODO: could pause ticks since we have zero peers now? - // } + if self.peers.isEmpty { + context.log.trace("No peers available, cancelling periodic gossip timer") + context.timers.cancel(for: gossipTickKey) + } return .same } } @@ -91,10 +93,10 @@ internal final class GossipShell { payload: Envelope, ackRef: ActorRef ) { - context.log.trace("Received gossip [\(identifier.gossipIdentifier)]: \(pretty: payload)", metadata: [ + context.log.trace("Received gossip [\(identifier.gossipIdentifier)]", metadata: [ "gossip/identity": "\(identifier.gossipIdentifier)", "gossip/origin": "\(origin.address)", - "gossip/incoming": "\(payload)", + "gossip/incoming": Logger.MetadataValue.pretty(payload), ]) // TODO: we could handle some actions if it issued some @@ -115,9 +117,9 @@ internal final class GossipShell { logic.localGossipUpdate(payload: payload) - context.log.trace("Gossip payload [\(identifier)] updated: \(payload)", metadata: [ - "gossip/identifier": "\(identifier)", - "gossip/payload": "\(payload)", + context.log.trace("Gossip payload [\(identifier.gossipIdentifier)] (locally) updated", metadata: [ + "gossip/identifier": "\(identifier.gossipIdentifier)", + "gossip/payload": "\(pretty: payload)", ]) // TODO: bump local version vector; once it is in the envelope @@ -148,7 +150,7 @@ internal final class GossipShell { private func runGossipRound(_ context: ActorContext) { defer { - self.ensureNextGossipRound(context: context) + self.ensureNextGossipRound(context) } let allPeers: [AddressableActorRef] = Array(self.peers).map { $0.asAddressable() } // TODO: some protocol Addressable so we can avoid this mapping? @@ -163,7 +165,7 @@ internal final class GossipShell { context.log.trace("New gossip round, selected [\(selectedPeers.count)] peers, from [\(allPeers.count)] peers", metadata: [ "gossip/id": "\(identifier.gossipIdentifier)", - "gossip/peers/selected": "\(selectedPeers)", + "gossip/peers/selected": Logger.MetadataValue.array(selectedPeers.map { "\($0)" }), ]) for selectedPeer in selectedPeers { @@ -209,11 +211,10 @@ internal final class GossipShell { to target: PeerRef, onAck: @escaping () -> Void ) { - // TODO: if we have seen tables, we can use them to bias the gossip towards the "more behind" nodes context.log.trace("Sending gossip to \(target.address)", metadata: [ "gossip/target": "\(target.address)", "gossip/peers/count": "\(self.peers.count)", - "actor/message": Logger.Metadata.pretty(payload), + "actor/message": Logger.MetadataValue.pretty(payload), ]) let ack = target.ask(for: GossipACK.self, timeout: .seconds(3)) { replyTo in @@ -234,14 +235,14 @@ internal final class GossipShell { } } - private func ensureNextGossipRound(context: ActorContext) { + private func ensureNextGossipRound(_ context: ActorContext) { guard !self.peers.isEmpty else { return // no need to schedule gossip ticks if we have no peers } let delay = self.settings.effectiveGossipInterval context.log.trace("Schedule next gossip round in \(delay.prettyDescription) (\(self.settings.gossipInterval.prettyDescription) ± \(self.settings.gossipIntervalRandomFactor * 100)%)") - context.timers.startSingle(key: "periodic-gossip", message: ._periodicGossipTick, delay: delay) + context.timers.startSingle(key: gossipTickKey, message: ._periodicGossipTick, delay: delay) } } @@ -253,18 +254,60 @@ extension GossipShell { Receptionist.RegistrationKey(id) } - private func initPeerDiscovery(_ context: ActorContext, settings: GossipShell.Settings) { + private func initPeerDiscovery(_ context: ActorContext) { switch self.settings.peerDiscovery { case .manuallyIntroduced: return // nothing to do, peers will be introduced manually + case .onClusterMember(let atLeastStatus, let resolvePeerOn): + func resolveInsertPeer(_ context: ActorContext, member: Cluster.Member) { + guard member.node != context.system.cluster.node else { + return // ignore self node + } + + guard atLeastStatus <= member.status else { + return // too "early" status of the member + } + + let resolved: AddressableActorRef = resolvePeerOn(member) + if let peer = resolved.ref as? PeerRef { + if self.peers.insert(peer).inserted { + context.log.debug("Automatically discovered peer", metadata: [ + "gossip/peer": "\(peer)", + "gossip/peerCount": "\(self.peers.count)", + "gossip/peers": "\(self.peers.map { $0.address })", + ]) + } + } else { + context.log.warning("Resolved reference \(resolved.ref) is not \(PeerRef.self), can not use it as peer for gossip.") + } + } + + let onClusterEventRef = context.subReceive(Cluster.Event.self) { event in + switch event { + case .snapshot(let membership): + for member in membership.members(atLeast: .joining) { + resolveInsertPeer(context, member: member) + } + case .membershipChange(let change): + resolveInsertPeer(context, member: change.member) + case .leadershipChange, .reachabilityChange: + () // ignore + } + } + context.system.cluster.events.subscribe(onClusterEventRef) + case .fromReceptionistListing(let id): let key = Receptionist.RegistrationKey(id) context.system.receptionist.register(context.myself, key: key) context.log.debug("Registered with receptionist key: \(key)") context.system.receptionist.subscribe(key: key, subscriber: context.subReceive(Receptionist.Listing.self) { listing in - context.log.trace("Receptionist listing update \(listing)") + context.log.trace("Peer listing update via receptionist", metadata: [ + "peer/listing": Logger.MetadataValue.array( + listing.refs.map { ref in Logger.MetadataValue.stringConvertible(ref) } + ), + ]) for peer in listing.refs { self.onIntroducePeer(context, peer: peer) } @@ -290,7 +333,7 @@ extension GossipShell { // self.sendGossip(context, identifier: key.identifier, logic.payload, to: peer) // } - self.ensureNextGossipRound(context: context) + self.ensureNextGossipRound(context) } } @@ -328,19 +371,17 @@ extension GossipShell { case sideChannelMessage(identifier: GossipIdentifier, Any) // internal messages - case _clusterEvent(Cluster.Event) case _periodicGossipTick } } // ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: GossipControl - -extension GossipShell { - typealias Ref = ActorRef +// MARK: Gossiper +/// A Gossiper +public enum Gossiper { /// Spawns a gossip actor, that will periodically gossip with its peers about the provided payload. - static func start( + static func start( _ context: ActorRefFactory, name naming: ActorNaming, of type: Envelope.Type = Envelope.self, props: Props = .init(), @@ -384,7 +425,7 @@ internal struct GossipControl { } /// Side channel messages which may be piped into specific gossip logics. - func sideChannelTell(identifier: GossipIdentifier, message: Any) { + func sideChannelTell(_ identifier: GossipIdentifier, message: Any) { self.ref.tell(.sideChannelMessage(identifier: identifier, message)) } } @@ -435,7 +476,7 @@ public struct AnyGossipIdentifier: Hashable, GossipIdentifier { } } -public struct StringGossipIdentifier: GossipIdentifier, Hashable, ExpressibleByStringLiteral { +public struct StringGossipIdentifier: GossipIdentifier, Hashable, ExpressibleByStringLiteral, CustomStringConvertible { public let gossipIdentifier: String public init(_ gossipIdentifier: StringLiteralType) { @@ -449,4 +490,8 @@ public struct StringGossipIdentifier: GossipIdentifier, Hashable, ExpressibleByS public var asAnyGossipIdentifier: AnyGossipIdentifier { AnyGossipIdentifier(self) } + + public var description: String { + "StringGossipIdentifier(\(self.gossipIdentifier))" + } } diff --git a/Sources/DistributedActors/Pattern/ConvergentGossip.swift b/Sources/DistributedActors/Pattern/ConvergentGossip.swift deleted file mode 100644 index ff8a7714d..000000000 --- a/Sources/DistributedActors/Pattern/ConvergentGossip.swift +++ /dev/null @@ -1,221 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2020 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging - -/// Convergent gossip is a gossip mechanism which aims to equalize some state across all peers participating. -internal final class ConvergentGossip { - typealias GossipPeerRef = ActorRef - - let settings: Settings - - // TODO: store Envelope and inside it the payload - private var payload: Payload? - - private let notifyOnGossipRef: ActorRef - - // TODO: allow finding them via Receptionist, for Membership gossip we manage it directly in ClusterShell tho - private var peers: Set.Message>> - - fileprivate init(notifyOnGossipRef: ActorRef, settings: Settings) { - self.settings = settings - self.payload = nil - self.peers = [] // TODO: passed in configuration to configure how we find peers - // TODO: or the entire envelope? - self.notifyOnGossipRef = notifyOnGossipRef - } - - var behavior: Behavior { - .setup { context in - self.scheduleNextGossipRound(context: context) - - // TODO: those timers depend on what the peer lookup strategy is - // context.system.cluster.autoUpdatedMembership(context) // but we don't offer it for Behaviors, just Actor<>... - // context.system.cluster.events.subscribe(context.messageAdapter { event in Message._clusterEvent(event) }) // TODO: implement this - - return Behavior.receiveMessage { - switch $0 { - case .updatePayload(let payload): - self.onLocalPayloadUpdate(context, payload: payload) - case .introducePeer(let peer): - self.onIntroducePeer(context, peer: peer) - - case .gossip(let envelope): - self.receiveGossip(context, envelope: envelope) - - case ._clusterEvent: - fatalError("automatic peer location is not implemented") // FIXME: implement this https://github.com/apple/swift-distributed-actors/issues/371 - - case ._periodicGossipTick: - self.onPeriodicGossipTick(context) - } - return .same - }.receiveSpecificSignal(Signals.Terminated.self) { context, terminated in - context.log.trace("Peer terminated: \(terminated.address), will not gossip to it anymore") - self.peers = self.peers.filter { - $0.address != terminated.address - } - // TODO: could pause ticks since we have zero peers now? - return .same - } - } - } - - private func onIntroducePeer(_ context: ActorContext, peer: GossipPeerRef) { - if self.peers.insert(context.watch(peer)).inserted { - context.log.trace( - "Got introduced to peer [\(peer)], pushing initial gossip immediately", - metadata: [ - "gossip/peerCount": "\(self.peers.count)", - "gossip/peers": "\(self.peers.map { $0.address })", - ] - ) - - // TODO: implement this rather as "high priority peer to gossip to" - self.sendGossip(context, to: peer) - // TODO: consider if we should do a quick gossip to any new peers etc - // TODO: peers are removed when they die, no manual way to do it - } - } - - private func receiveGossip(_ context: ActorContext, envelope: ConvergentGossip.GossipEnvelope) { - context.log.trace( - "Received gossip: \(envelope)", - metadata: [ - "gossip/current": "\(String(reflecting: self.payload))", - "gossip/incoming": "\(envelope)", - ] - ) - - // send to recipient which may then update() the payload we are gossiping - self.notifyOnGossipRef.tell(envelope.payload) // could be good as request/reply here - } - - private func onLocalPayloadUpdate(_ context: ActorContext, payload: Payload) { - context.log.trace( - "Gossip payload updated: \(payload)", - metadata: [ - "actor/message": Logger.Metadata.pretty(payload), - "gossip/previousPayload": "\(self.payload, orElse: "nil")", - ] - ) - self.payload = payload - // TODO: bump local version vector; once it is in the envelope - } - - private func onPeriodicGossipTick(_ context: ActorContext) { - self.scheduleNextGossipRound(context: context) - - if let target = self.peers.shuffled().first { - self.sendGossip(context, to: target) - } - } - - private func sendGossip(_ context: ActorContext, to target: GossipPeerRef) { - guard let payload = self.payload else { - context.log.trace( - "No payload set, skipping gossip round", - metadata: [ - "gossip/target": "\(target)", - ] - ) - return - } - - // TODO: Optimization looking at seen table, decide who is not going to gain info form us anyway, and de-prioritize them that's nicer for small clusters, I guess - let envelope = GossipEnvelope(payload: payload) // TODO: carry all the vector clocks here rather in the payload - - // TODO: if we have seen tables, we can use them to bias the gossip towards the "more behind" nodes - context.log.trace( - "Sending gossip to \(target)", - metadata: [ - "gossip/target": "\(target.address)", - "gossip/peerCount": "\(self.peers.count)", - "gossip/peers": "\(self.peers.map { $0.address })", - "actor/message": Logger.Metadata.pretty(envelope), - ] - ) - - target.tell(.gossip(envelope)) - } - - private func scheduleNextGossipRound(context: ActorContext) { - // FIXME: configurable rounds - let delay = TimeAmount.seconds(1) // TODO: configuration - context.log.trace("Schedule next gossip round in \(delay.prettyDescription)") - context.timers.startSingle(key: "periodic-gossip", message: ._periodicGossipTick, delay: delay) - } -} - -extension ConvergentGossip { - enum Message: ActorMessage { - // gossip - case gossip(GossipEnvelope) - - // local messages - case updatePayload(Payload) - case introducePeer(GossipPeerRef) - - // internal messages - case _clusterEvent(Cluster.Event) - case _periodicGossipTick - } - - struct GossipEnvelope: Codable { - // TODO: this is to become the generic version what Cluster.Gossip is - let payload: Payload - // TODO: var converged: Bool {} - } -} - -extension ConvergentGossip { - typealias Ref = ActorRef.Message> - - /// Spawns a gossip actor, that will periodically gossip with its peers about the provided payload. - static func start( - _ context: ActorContext, name naming: ActorNaming, of type: Payload.Type = Payload.self, - notifyOnGossipRef: ActorRef, props: Props = .init(), settings: Settings = .init() - ) throws -> ConvergentGossipControl { - let gossipShell = ConvergentGossip(notifyOnGossipRef: notifyOnGossipRef, settings: settings) - let ref = try context.spawn(naming, props: props, gossipShell.behavior) - return ConvergentGossipControl(ref) - } -} - -internal struct ConvergentGossipControl { - // TODO: rather let's hide it trough methods - private let ref: ConvergentGossip.Ref - - init(_ ref: ConvergentGossip.Ref) { - self.ref = ref - } - - func update(payload: Payload) { - self.ref.tell(.updatePayload(payload)) - } - - /// Introduce a peer to the gossip group - func introduce(peer: ConvergentGossip.Ref) { - self.ref.tell(.introducePeer(peer)) - } -} - -// ==== ---------------------------------------------------------------------------------------------------------------- -// MARK: ConvergentGossip Settings - -extension ConvergentGossip { - struct Settings { - var gossipInterval: TimeAmount = .seconds(1) - } -} diff --git a/Sources/DistributedActors/Receptionist+Listing.swift b/Sources/DistributedActors/Receptionist+Listing.swift index c6b947468..d2270fee4 100644 --- a/Sources/DistributedActors/Receptionist+Listing.swift +++ b/Sources/DistributedActors/Receptionist+Listing.swift @@ -65,7 +65,7 @@ extension Receptionist { let refs: Set> var description: String { - "Listing<\(Message.self)>(\(self.refs.map { $0.address }))" + "Listing<\(reflecting: Message.self)>(\(self.refs.map { $0.address }))" } } } diff --git a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift index 720edbee2..42d596a4c 100644 --- a/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift +++ b/Sources/DistributedActors/Serialization/Serialization+SerializerID.swift @@ -122,8 +122,6 @@ extension Serialization { internal static let CRDTLWWMapDelta: SerializerID = .protobufRepresentable internal static let CRDTLWWRegister: SerializerID = .protobufRepresentable - internal static let ConvergentGossipMembership: SerializerID = .foundationJSON - // op log receptionist internal static let PushOps: SerializerID = .foundationJSON internal static let AckOps: SerializerID = .foundationJSON diff --git a/Sources/DistributedActors/Serialization/Serialization.swift b/Sources/DistributedActors/Serialization/Serialization.swift index 89a0a54f0..ec71f7157 100644 --- a/Sources/DistributedActors/Serialization/Serialization.swift +++ b/Sources/DistributedActors/Serialization/Serialization.swift @@ -114,7 +114,9 @@ public class Serialization { // cluster settings.register(ClusterShell.Message.self) settings.register(Cluster.Event.self) - settings.register(ConvergentGossip.Message.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands + settings.register(Cluster.Gossip.self) + settings.register(GossipShell.Message.self) + settings.register(StringGossipIdentifier.self) // receptionist needs some special casing // TODO: document how to deal with `protocol` message accepting actors, those should be very rare. @@ -226,7 +228,6 @@ extension Serialization { private func _ensureAllRegisteredSerializers() throws { // all non-codable types are specialized types; register specific serializer instances for them early for typeKey in self.settings.typeToManifestRegistry.keys { - self.log.trace("Ensure serializer eagerly: \(typeKey)") try typeKey._ensureSerializer(self) } diff --git a/Sources/DistributedActorsTestKit/ActorTestKit.swift b/Sources/DistributedActorsTestKit/ActorTestKit.swift index 339911f12..0eb903966 100644 --- a/Sources/DistributedActorsTestKit/ActorTestKit.swift +++ b/Sources/DistributedActorsTestKit/ActorTestKit.swift @@ -253,10 +253,19 @@ public extension ActorTestKit { polledTimes += 1 try block() usleep(useconds_t(interval.microseconds)) + } catch CallSiteError.error(let errorDetails) { + let message = callSite.detailedMessage(""" + Failed within \(timeAmount.prettyDescription) for block at \(file):\(line). \ + Queried \(polledTimes) times, within \(timeAmount.prettyDescription). \ + Error: \(errorDetails) + """) + XCTFail(message, file: callSite.file, line: callSite.line) + throw AssertionHoldsError(message: message) } catch { let message = callSite.detailedMessage(""" Failed within \(timeAmount.prettyDescription) for block at \(file):\(line). \ - Queried \(polledTimes) times, within \(timeAmount.prettyDescription). + Queried \(polledTimes) times, within \(timeAmount.prettyDescription). \ + Error: \(error) """) XCTFail(message, file: callSite.file, line: callSite.line) throw AssertionHoldsError(message: message) diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift index b2f2562d1..448036f7e 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredNodesTestBase.swift @@ -351,7 +351,7 @@ extension ClusteredNodesTestBase { let events = try eventStreamProbe.fishFor(Cluster.Event.self, within: .seconds(5)) { switch $0 { case .membershipChange(let change) - where change.node == node && change.toStatus.isAtLeastDown: + where change.node == node && change.toStatus.isAtLeast(.down): return .catchComplete($0) default: return .ignore diff --git a/Sources/DistributedActorsTestKit/LogCapture.swift b/Sources/DistributedActorsTestKit/LogCapture.swift index 61fe9a8d8..875d64651 100644 --- a/Sources/DistributedActorsTestKit/LogCapture.swift +++ b/Sources/DistributedActorsTestKit/LogCapture.swift @@ -165,13 +165,13 @@ extension LogCapture { } public struct CapturedLogMessage { - let date: Date - let level: Logger.Level - var message: Logger.Message - var metadata: Logger.Metadata? - let file: String - let function: String - let line: UInt + public let date: Date + public let level: Logger.Level + public var message: Logger.Message + public var metadata: Logger.Metadata? + public let file: String + public let function: String + public let line: UInt } // ==== ---------------------------------------------------------------------------------------------------------------- @@ -187,10 +187,12 @@ struct LogCaptureLogHandler: LogHandler { } public func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) { - guard self.capture.settings.filterActorPaths.contains(where: { path in self.label.starts(with: path) }) else { + let actorPath = self.metadata["actor/path"].map { "\($0)" } ?? "" + + guard self.capture.settings.filterActorPaths.contains(where: { path in actorPath.starts(with: path) }) else { return // ignore this actor's logs, it was filtered out } - guard !self.capture.settings.excludeActorPaths.contains(self.label) else { + guard !self.capture.settings.excludeActorPaths.contains(actorPath) else { return // actor was excluded explicitly } guard self.capture.settings.grep.isEmpty || self.capture.settings.grep.contains(where: { "\(message)".contains($0) }) else { @@ -201,29 +203,23 @@ struct LogCaptureLogHandler: LogHandler { } let date = Date() - var _metadata: Logger.Metadata = metadata ?? [:] + var _metadata: Logger.Metadata = self.metadata + _metadata.merge(metadata ?? [:], uniquingKeysWith: { _, r in r }) _metadata["label"] = "\(self.label)" self.capture.append(CapturedLogMessage(date: date, level: level, message: message, metadata: _metadata, file: file, function: function, line: line)) } - public subscript(metadataKey _: String) -> Logger.Metadata.Value? { + public subscript(metadataKey metadataKey: String) -> Logger.Metadata.Value? { get { - nil + self.metadata[metadataKey] } set { - // ignore + self.metadata[metadataKey] = newValue } } - public var metadata: Logging.Logger.Metadata { - get { - [:] - } - set { - // ignore - } - } + public var metadata: Logging.Logger.Metadata = [:] public var logLevel: Logger.Level { get { @@ -325,7 +321,31 @@ extension LogCapture { } } - public func grep(_ string: String) -> [CapturedLogMessage] { - self.logs.filter { "\($0)".contains(string) } + public func grep(_ string: String, metadata metadataQuery: [String: String] = [:]) -> [CapturedLogMessage] { + self.logs.filter { + guard "\($0)".contains(string) else { + // mismatch, exclude it + return false + } + + if metadataQuery.isEmpty { + return true + } + + let metas = $0.metadata ?? [:] + for (queryKey, queryValue) in metadataQuery { + if let value = metas[queryKey] { + if queryValue != "\(value)" { + // mismatch, exclude it + return false + } // ok, continue checking other keys + } else { + // key did not exist + return false + } + } + + return true + } } } diff --git a/Tests/DistributedActorsTests/Actorable/ActorContextReceptionistTests.swift b/Tests/DistributedActorsTests/Actorable/ActorContextReceptionistTests.swift index 7003ac2d1..750476157 100644 --- a/Tests/DistributedActorsTests/Actorable/ActorContextReceptionistTests.swift +++ b/Tests/DistributedActorsTests/Actorable/ActorContextReceptionistTests.swift @@ -25,7 +25,10 @@ final class ActorContextReceptionTests: ActorSystemTestBase { let listing: Receptionist.Listing = try self.testKit.eventually(within: .seconds(3)) { let readReply = owner.readLastObservedValue() guard let listing = try readReply.wait() else { - throw self.testKit.error() + throw self.testKit.error("No listing received") + } + guard listing.actors.first != nil else { + throw self.testKit.error("Listing received, but it is empty (\(listing))") } return listing } diff --git a/Tests/DistributedActorsTests/BackoffStrategyTests.swift b/Tests/DistributedActorsTests/BackoffStrategyTests.swift index 00cb5c885..7d00204d4 100644 --- a/Tests/DistributedActorsTests/BackoffStrategyTests.swift +++ b/Tests/DistributedActorsTests/BackoffStrategyTests.swift @@ -79,6 +79,17 @@ class BackoffStrategyTests: XCTestCase { backoff.next()?.shouldBeLessThanOrEqual(max + maxRandomNoise) } + func test_exponentialBackoff_shouldStopAfterMaxAttempts() { + let maxAttempts = 3 + var backoff = Backoff.exponential(initialInterval: .milliseconds(500), randomFactor: 0, maxAttempts: maxAttempts) + backoff.next()!.shouldEqual(.milliseconds(500)) + backoff.next()!.shouldEqual(.milliseconds(750)) + backoff.next()!.shouldEqual(.milliseconds(1125)) + backoff.next().shouldBeNil() + backoff.next().shouldBeNil() + backoff.next().shouldBeNil() + } + func test_exponentialBackoff_withLargeInitial_shouldAdjustCap() { _ = Backoff.exponential(initialInterval: .seconds(60)) // cap used to be hardcoded which would cause this to precondition crash } diff --git a/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift b/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift index c6d50c09d..b44758fbf 100644 --- a/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift +++ b/Tests/DistributedActorsTests/CRDT/CRDTActorOwnedTests.swift @@ -118,11 +118,6 @@ final class CRDTActorOwnedTests: ActorSystemTestBase { let g1 = "gcounter-1" let g2 = "gcounter-2" - // TODO: remove after figuring out why tests are flakey (https://github.com/apple/swift-distributed-actors/issues/157) - defer { - self.logCapture.printLogs() - } - // g1 has two owners let g1Owner1EventP = self.testKit.spawnTestProbe(expecting: OwnerEventProbeMessage.self) let g1Owner1 = try system.spawn("gcounter1-owner1", self.actorOwnedGCounterBehavior(id: g1, oep: g1Owner1EventP.ref)) @@ -214,8 +209,12 @@ final class CRDTActorOwnedTests: ActorSystemTestBase { private func actorOwnedORSetBehavior(id: String, oep ownerEventProbe: ActorRef) -> Behavior { .setup { context in let s = CRDT.ORSet.makeOwned(by: context, id: id) - s.onUpdate { id, ss in - context.log.trace("ORSet \(id) updated with new value: \(ss.elements)") + + s.onUpdate { id, updated in + context.log.trace("ORSet \(id) updated with new value, count: \(updated.elements)", metadata: [ + "set/count": "\(updated.count)", + "set": "\(updated.elements)", + ]) ownerEventProbe.tell(.ownerDefinedOnUpdate) } s.onDelete { id in @@ -226,10 +225,22 @@ final class CRDTActorOwnedTests: ActorSystemTestBase { return .receiveMessage { message in switch message { case .add(let element, let consistency, let timeout, let replyTo): + context.log.warning("add [\(element)] ... ", metadata: [ + "before": "\(s)", + "add": "\(element)", + ]) + let before = s.lastObservedValue s.insert(element, writeConsistency: consistency, timeout: timeout)._onComplete { result in switch result { - case .success(let s): - replyTo.tell(s.elements) + case .success(let updated): + context.log.warning("added [\(element)] ... \(updated.count)", metadata: [ + "before": "\(before)", + "before/count": "\(before.count)", + "updated": "\(updated.prettyDescription)", + "updated/count": "\(updated.count)", + "add": "\(element)", + ]) + replyTo.tell(updated.elements) case .failure(let error): fatalError("add error \(error)") } @@ -317,12 +328,12 @@ final class CRDTActorOwnedTests: ActorSystemTestBase { // we issue many writes, and want to see that for i in 1 ... 100 { - owner.tell(.add(i, consistency: .local, timeout: .seconds(1), replyTo: ignore)) + owner.tell(.add(i, consistency: .local, timeout: .seconds(3), replyTo: ignore)) } - owner.tell(.add(1000, consistency: .local, timeout: .seconds(1), replyTo: probe.ref)) + owner.tell(.add(1000, consistency: .local, timeout: .seconds(3), replyTo: probe.ref)) - let msg = try probe.expectMessage() - msg.count.shouldEqual(101) + let set = try probe.expectMessage() + set.count.shouldEqual(101) } // ==== ------------------------------------------------------------------------------------------------------------ diff --git a/Tests/DistributedActorsTests/CRDT/CRDTActorableOwnedTests.swift b/Tests/DistributedActorsTests/CRDT/CRDTActorableOwnedTests.swift index 7ffc06278..9f63d8346 100644 --- a/Tests/DistributedActorsTests/CRDT/CRDTActorableOwnedTests.swift +++ b/Tests/DistributedActorsTests/CRDT/CRDTActorableOwnedTests.swift @@ -23,11 +23,6 @@ final class CRDTActorableOwnedTests: ActorSystemTestBase { let g1 = "gcounter-1" let g2 = "gcounter-2" - // TODO: remove after figuring out why tests are flakey (https://github.com/apple/swift-distributed-actors/issues/157) - defer { - self.logCapture.printLogs() - } - // g1 has two owners let g1Owner1EventP = self.testKit.spawnTestProbe(expecting: OwnerEventProbeMessage.self) let g1Owner1 = try system.spawn("gcounter1-owner1") { context in diff --git a/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift b/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift index 8366a0af7..5d01d8696 100644 --- a/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift +++ b/Tests/DistributedActorsTests/CRDT/CRDTGossipReplicationClusteredTests.swift @@ -190,7 +190,6 @@ final class CRDTGossipReplicationTests: ClusteredNodesTestBase { let testKit: ActorTestKit = self.testKit(first) _ = try p1.fishFor(Int.self, within: .seconds(5)) { counter in - pprint("\(p1) received = \(pretty: counter)") if counter.value == 6 { return .complete } else { @@ -199,8 +198,8 @@ final class CRDTGossipReplicationTests: ClusteredNodesTestBase { } try testKit.assertHolds(for: .seconds(5), interval: .seconds(1)) { - let logs = self.capturedLogs(of: first).grep("Received gossip") - pinfo("LOGS: \(lineByLine: logs)") + let logs: [CapturedLogMessage] = self.capturedLogs(of: first) + .grep("Received gossip", metadata: ["gossip/identity": "counter"]) guard logs.count < 5 else { throw testKit.error("Received gossip more times than expected! Logs: \(lineByLine: logs)") @@ -215,8 +214,8 @@ final class CRDTGossipReplicationTests: ClusteredNodesTestBase { _ = try fourth.spawn("reader-4", ownsCounter(p: p4)) try testKit.assertHolds(for: .seconds(5), interval: .seconds(1)) { - let logs = self.capturedLogs(of: fourth).grep("Received gossip") - pinfo("LOGS: \(lineByLine: logs)") + let logs = self.capturedLogs(of: fourth) + .grep("Received gossip", metadata: ["gossip/identity": "counter"]) guard logs.count < 5 else { throw testKit.error("Received gossip more times than expected! Logs: \(lineByLine: logs)") diff --git a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift index e4c412ada..81ca3ba10 100644 --- a/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/AssociationClusteredTests.swift @@ -29,64 +29,64 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // MARK: Happy path, accept association func test_boundServer_shouldAcceptAssociate() throws { - let (local, remote) = self.setUpPair() + let (first, second) = self.setUpPair() - local.cluster.join(node: remote.cluster.node.node) + first.cluster.join(node: second.cluster.node.node) - try assertAssociated(local, withExactly: remote.cluster.node) - try assertAssociated(remote, withExactly: local.cluster.node) + try assertAssociated(first, withExactly: second.cluster.node) + try assertAssociated(second, withExactly: first.cluster.node) } func test_boundServer_shouldAcceptAssociate_raceFromBothNodes() throws { - let (local, remote) = self.setUpPair() + let (first, second) = self.setUpPair() let n3 = self.setUpNode("node-3") let n4 = self.setUpNode("node-4") let n5 = self.setUpNode("node-5") let n6 = self.setUpNode("node-6") - local.cluster.join(node: remote.cluster.node.node) - remote.cluster.join(node: local.cluster.node.node) + first.cluster.join(node: second.cluster.node.node) + second.cluster.join(node: first.cluster.node.node) - n3.cluster.join(node: local.cluster.node.node) - local.cluster.join(node: n3.cluster.node.node) + n3.cluster.join(node: first.cluster.node.node) + first.cluster.join(node: n3.cluster.node.node) - n4.cluster.join(node: local.cluster.node.node) - local.cluster.join(node: n4.cluster.node.node) + n4.cluster.join(node: first.cluster.node.node) + first.cluster.join(node: n4.cluster.node.node) - n5.cluster.join(node: local.cluster.node.node) - local.cluster.join(node: n5.cluster.node.node) + n5.cluster.join(node: first.cluster.node.node) + first.cluster.join(node: n5.cluster.node.node) - n6.cluster.join(node: local.cluster.node.node) - local.cluster.join(node: n6.cluster.node.node) + n6.cluster.join(node: first.cluster.node.node) + first.cluster.join(node: n6.cluster.node.node) - try assertAssociated(local, withAtLeast: remote.cluster.node) - try assertAssociated(remote, withAtLeast: local.cluster.node) + try assertAssociated(first, withAtLeast: second.cluster.node) + try assertAssociated(second, withAtLeast: first.cluster.node) } func test_handshake_shouldNotifyOnSuccess() throws { try shouldNotThrow { - let (local, remote) = self.setUpPair() + let (first, second) = self.setUpPair() - local.cluster.ref.tell(.command(.handshakeWith(remote.cluster.node.node))) + first.cluster.ref.tell(.command(.handshakeWith(second.cluster.node.node))) - try assertAssociated(local, withExactly: remote.cluster.node) - try assertAssociated(remote, withExactly: local.cluster.node) + try assertAssociated(first, withExactly: second.cluster.node) + try assertAssociated(second, withExactly: first.cluster.node) } } func test_handshake_shouldNotifySuccessWhenAlreadyConnected() throws { try shouldNotThrow { - let (local, remote) = self.setUpPair() + let (first, second) = self.setUpPair() - local.cluster.ref.tell(.command(.handshakeWith(remote.cluster.node.node))) + first.cluster.ref.tell(.command(.handshakeWith(second.cluster.node.node))) - try assertAssociated(local, withExactly: remote.cluster.node) - try assertAssociated(remote, withExactly: local.cluster.node) + try assertAssociated(first, withExactly: second.cluster.node) + try assertAssociated(second, withExactly: first.cluster.node) - local.cluster.ref.tell(.command(.handshakeWith(remote.cluster.node.node))) + first.cluster.ref.tell(.command(.handshakeWith(second.cluster.node.node))) - try assertAssociated(local, withExactly: remote.cluster.node) - try assertAssociated(remote, withExactly: local.cluster.node) + try assertAssociated(first, withExactly: second.cluster.node) + try assertAssociated(second, withExactly: first.cluster.node) } } @@ -98,7 +98,7 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { let (first, second) = self.setUpPair() let secondName = second.cluster.node.node.systemName - let remotePort = second.cluster.node.port + let secondPort = second.cluster.node.port let firstEventsProbe = self.testKit(first).spawnTestProbe(expecting: Cluster.Event.self) let secondEventsProbe = self.testKit(second).spawnTestProbe(expecting: Cluster.Event.self) @@ -111,18 +111,18 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { try assertAssociated(second, withExactly: first.cluster.node) let oldSecond = second - let shutdown = oldSecond.shutdown() // kill remote node + let shutdown = oldSecond.shutdown() // kill second node try shutdown.wait(atMost: .seconds(3)) let secondReplacement = self.setUpNode(secondName + "-REPLACEMENT") { settings in - settings.cluster.bindPort = remotePort + settings.cluster.bindPort = secondPort } let secondReplacementEventsProbe = self.testKit(secondReplacement).spawnTestProbe(expecting: Cluster.Event.self) secondReplacement.cluster.events.subscribe(secondReplacementEventsProbe.ref) second.cluster.events.subscribe(secondReplacementEventsProbe.ref) - // the new replacement node is now going to initiate a handshake with 'local' which knew about the previous - // instance (oldRemote) on the same node; It should accept this new handshake, and ban the previous node. + // the new replacement node is now going to initiate a handshake with 'first' which knew about the previous + // instance (oldSecond) on the same node; It should accept this new handshake, and ban the previous node. secondReplacement.cluster.join(node: first.cluster.node.node) // verify we are associated ONLY with the appropriate nodes now; @@ -131,32 +131,32 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { } } - func test_association_shouldAllowSendingToRemoteReference() throws { + func test_association_shouldAllowSendingToSecondReference() throws { try shouldNotThrow { - let (local, remote) = self.setUpPair() + let (first, second) = self.setUpPair() - let probeOnRemote = self.testKit(remote).spawnTestProbe(expecting: String.self) - let refOnRemoteSystem: ActorRef = try remote.spawn( - "remoteAcquaintance", + let probeOnSecond = self.testKit(second).spawnTestProbe(expecting: String.self) + let refOnSecondSystem: ActorRef = try second.spawn( + "secondAcquaintance", .receiveMessage { message in - probeOnRemote.tell("forwarded:\(message)") + probeOnSecond.tell("forwarded:\(message)") return .same } ) - local.cluster.join(node: remote.cluster.node.node) + first.cluster.join(node: second.cluster.node.node) - try assertAssociated(local, withExactly: remote.settings.cluster.uniqueBindNode) + try assertAssociated(first, withExactly: second.settings.cluster.uniqueBindNode) - // first we manually construct the "right remote path"; Don't do this in normal production code - let uniqueRemoteAddress = ActorAddress(node: remote.cluster.node, path: refOnRemoteSystem.path, incarnation: refOnRemoteSystem.address.incarnation) - // to then obtain a remote ref ON the `system`, meaning that the node within uniqueRemoteAddress is a remote one - let resolvedRef = self.resolveRef(local, type: String.self, address: uniqueRemoteAddress, on: remote) - // the resolved ref is a local resource on the `system` and points via the right association to the remote actor - // inside system `remote`. Sending messages to a ref constructed like this will make the messages go over remoting. + // first we manually construct the "right second path"; Don't do this in normal production code + let uniqueSecondAddress = ActorAddress(node: second.cluster.node, path: refOnSecondSystem.path, incarnation: refOnSecondSystem.address.incarnation) + // to then obtain a second ref ON the `system`, meaning that the node within uniqueSecondAddress is a second one + let resolvedRef = self.resolveRef(first, type: String.self, address: uniqueSecondAddress, on: second) + // the resolved ref is a first resource on the `system` and points via the right association to the second actor + // inside system `second`. Sending messages to a ref constructed like this will make the messages go over remoting. resolvedRef.tell("HELLO") - try probeOnRemote.expectMessage("forwarded:HELLO") + try probeOnSecond.expectMessage("forwarded:HELLO") } } @@ -187,71 +187,132 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { } // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Retry associations + // MARK: Retry handshakes + + func test_handshake_shouldKeepTryingUntilOtherNodeBindsPort() throws { + let first = setUpNode("first") + + let secondPort = first.cluster.node.node.port + 10 + // second is NOT started, but we already ask first to handshake with the second one (which will fail, though the node should keep trying) + let secondNode = Node(systemName: "second", host: "127.0.0.1", port: secondPort) - func test_association_shouldKeepTryingUntilOtherNodeBindsPort() throws { - let local = setUpNode("local") + first.cluster.join(node: secondNode) + sleep(3) // we give it some time to keep failing to connect, so the second node is not yet started - let remotePort = local.cluster.node.node.port + 10 - // remote is NOT started, but we already ask local to handshake with the remote one (which will fail, though the node should keep trying) - let remoteNode = Node(systemName: "remote", host: "127.0.0.1", port: remotePort) + let second = setUpNode("second") { settings in + settings.cluster.bindPort = secondPort + } - local.cluster.join(node: remoteNode) - sleep(1) // we give it some time to keep failing to connect, so the second node is not yet started + try assertAssociated(first, withExactly: second.cluster.node) + try assertAssociated(second, withExactly: first.cluster.node) + } - let remote = setUpNode("remote") { settings in - settings.cluster.bindPort = remotePort + func test_handshake_shouldStopTryingWhenMaxAttemptsExceeded() throws { + let first = setUpNode("first") { settings in + settings.cluster.handshakeReconnectBackoff = Backoff.exponential( + initialInterval: .milliseconds(100), + maxAttempts: 2 + ) } - try assertAssociated(local, withExactly: remote.cluster.node) - try assertAssociated(remote, withExactly: local.cluster.node) + let secondPort = first.cluster.node.node.port + 10 + // second is NOT started, but we already ask first to handshake with the second one (which will fail, though the node should keep trying) + let secondNode = Node(systemName: "second", host: "127.0.0.1", port: secondPort) + + first.cluster.join(node: secondNode) + sleep(1) // we give it some time to keep failing to connect (and exhaust the retries) + + let logs = self.capturedLogs(of: first) + try logs.awaitLogContaining(self.testKit(first), text: "Giving up on handshake with node [sact://second@127.0.0.1:9011]") } - func test_association_shouldNotAssociateWhenRejected() throws { - let local = self.setUpNode("local") { settings in + func test_handshake_shouldNotAssociateWhenRejected() throws { + let first = self.setUpNode("first") { settings in settings.cluster._protocolVersion.major += 1 // handshake will be rejected on major version difference } - let remote = self.setUpNode("remote") + let second = self.setUpNode("second") - local.cluster.join(node: remote.cluster.node.node) + first.cluster.join(node: second.cluster.node.node) - try assertNotAssociated(system: local, node: remote.cluster.node) - try assertNotAssociated(system: remote, node: local.cluster.node) + try assertNotAssociated(system: first, node: second.cluster.node) + try assertNotAssociated(system: second, node: first.cluster.node) } func test_handshake_shouldNotifyOnRejection() throws { - let local = self.setUpNode("local") { settings in + let first = self.setUpNode("first") { settings in settings.cluster._protocolVersion.major += 1 // handshake will be rejected on major version difference } - let remote = self.setUpNode("remote") + let second = self.setUpNode("second") - local.cluster.ref.tell(.command(.handshakeWith(remote.cluster.node.node))) + first.cluster.ref.tell(.command(.handshakeWith(second.cluster.node.node))) - try assertNotAssociated(system: local, node: remote.cluster.node) - try assertNotAssociated(system: remote, node: local.cluster.node) + try assertNotAssociated(system: first, node: second.cluster.node) + try assertNotAssociated(system: second, node: first.cluster.node) - try self.capturedLogs(of: local).awaitLogContaining(self.testKit(local), text: "incompatibleProtocolVersion(local:") + try self.capturedLogs(of: first) + .awaitLogContaining( + self.testKit(first), + text: "Handshake rejected by [sact://second@127.0.0.1:9002], reason: incompatibleProtocolVersion" + ) + } + + // ==== ---------------------------------------------------------------------------------------------------------------- + // MARK: Leaving/down rejecting handshakes + + func test_handshake_shouldRejectIfNodeIsLeavingOrDown() throws { + let first = self.setUpNode("first") { settings in + settings.cluster.onDownAction = .none // don't shutdown this node (keep process alive) + } + let second = self.setUpNode("second") + + first.cluster.down(node: first.cluster.node.node) + + let testKit = self.testKit(first) + try testKit.eventually(within: .seconds(3)) { + let snapshot: Cluster.Membership = first.cluster.membershipSnapshot + if let selfMember = snapshot.uniqueMember(first.cluster.node) { + if selfMember.status == .down { + () // good + } else { + throw testKit.error("Expecting \(first.cluster.node) to become [.down] but was \(selfMember.status). Membership: \(pretty: snapshot)") + } + } else { + throw testKit.error("No self member for \(first.cluster.node)! Membership: \(pretty: snapshot)") + } + } + + // now we try to join the "already down" node; it should reject any such attempts + second.cluster.ref.tell(.command(.handshakeWith(first.cluster.node.node))) + + try assertNotAssociated(system: first, node: second.cluster.node) + try assertNotAssociated(system: second, node: first.cluster.node) + + try self.capturedLogs(of: second) + .awaitLogContaining( + self.testKit(second), + text: "Handshake rejected by [sact://first@127.0.0.1:9001], reason: Node already leaving cluster." + ) } // ==== ------------------------------------------------------------------------------------------------------------ - // MARK: Remote control caching + // MARK: second control caching - func test_cachedRemoteControlsWithSameNodeID_shouldNotOverwriteEachOther() throws { - let (local, remote) = setUpPair() - remote.cluster.join(node: local.cluster.node.node) + func test_cachedSecondControlsWithSameNodeID_shouldNotOverwriteEachOther() throws { + let (first, second) = setUpPair() + second.cluster.join(node: first.cluster.node.node) - try assertAssociated(local, withExactly: remote.cluster.node) + try assertAssociated(first, withExactly: second.cluster.node) let thirdSystem = self.setUpNode("third") { settings in settings.cluster.enabled = true - settings.cluster.nid = remote.settings.cluster.nid + settings.cluster.nid = second.settings.cluster.nid settings.cluster.node.port = 9119 } - thirdSystem.cluster.join(node: local.cluster.node.node) - try assertAssociated(local, withExactly: [remote.cluster.node, thirdSystem.settings.cluster.uniqueBindNode]) + thirdSystem.cluster.join(node: first.cluster.node.node) + try assertAssociated(first, withExactly: [second.cluster.node, thirdSystem.settings.cluster.uniqueBindNode]) - local._cluster?._testingOnly_associations.count.shouldEqual(2) + first._cluster?._testingOnly_associations.count.shouldEqual(2) } func test_sendingMessageToNotYetAssociatedNode_mustCauseAssociationAttempt() throws { @@ -304,13 +365,13 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // down myself first.cluster.down(node: first.cluster.node.node) - let localProbe = self.testKit(first).spawnTestProbe(expecting: Cluster.Membership.self) - let remoteProbe = self.testKit(second).spawnTestProbe(expecting: Cluster.Membership.self) + let firstProbe = self.testKit(first).spawnTestProbe(expecting: Cluster.Membership.self) + let secondProbe = self.testKit(second).spawnTestProbe(expecting: Cluster.Membership.self) - // we we down local on local, it should become down there: + // we we down first on first, it should become down there: try self.testKit(first).eventually(within: .seconds(3)) { - first.cluster.ref.tell(.query(.currentMembership(localProbe.ref))) - let firstMembership = try localProbe.expectMessage() + first.cluster.ref.tell(.query(.currentMembership(firstProbe.ref))) + let firstMembership = try firstProbe.expectMessage() guard let selfMember = firstMembership.uniqueMember(first.cluster.node) else { throw self.testKit(second).error("No self member in membership! Wanted: \(first.cluster.node)", line: #line - 1) @@ -324,19 +385,19 @@ final class ClusterAssociationTests: ClusteredNodesTestBase { // and the second node should also notice try self.testKit(second).eventually(within: .seconds(3)) { - second.cluster.ref.tell(.query(.currentMembership(remoteProbe.ref))) - let secondMembership = try remoteProbe.expectMessage() + second.cluster.ref.tell(.query(.currentMembership(secondProbe.ref))) + let secondMembership = try secondProbe.expectMessage() - // and the local node should also propagate the Down information to the remote node - // although this may be a best effort since the local can just shut down if it wanted to, + // and the first node should also propagate the Down information to the second node + // although this may be a best effort since the first can just shut down if it wanted to, // this scenario assumes a graceful leave though: - guard let localMemberObservedOnRemote = secondMembership.uniqueMember(first.cluster.node) else { + guard let firstMemberObservedOnSecond = secondMembership.uniqueMember(first.cluster.node) else { throw self.testKit(second).error("\(second) does not know about the \(first.cluster.node) at all...!", line: #line - 1) } - guard localMemberObservedOnRemote.status == .down else { - throw self.testKit(second).error("Wanted to see \(first.cluster.node) as DOWN on \(second), but was still: \(localMemberObservedOnRemote)", line: #line - 1) + guard firstMemberObservedOnSecond.status == .down else { + throw self.testKit(second).error("Wanted to see \(first.cluster.node) as DOWN on \(second), but was still: \(firstMemberObservedOnSecond)", line: #line - 1) } } } diff --git a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift index b5993f30c..c67c6269a 100644 --- a/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/ClusterLeaderActionsClusteredTests.swift @@ -158,12 +158,12 @@ final class ClusterLeaderActionsClusteredTests: ClusteredNodesTestBase { let first = self.setUpNode("first") { settings in settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2) } - let p1 = self.testKit(first).spawnTestProbe(expecting: Cluster.Event.self) - first.cluster.events.subscribe(p1.ref) - let second = self.setUpNode("second") { settings in settings.cluster.autoLeaderElection = .lowestReachable(minNumberOfMembers: 2) } + + let p1 = self.testKit(first).spawnTestProbe(expecting: Cluster.Event.self) + first.cluster.events.subscribe(p1.ref) let p2 = self.testKit(second).spawnTestProbe(expecting: Cluster.Event.self) second.cluster.events.subscribe(p2.ref) @@ -173,28 +173,37 @@ final class ClusterLeaderActionsClusteredTests: ClusteredNodesTestBase { try self.ensureNodes(.up, nodes: first.cluster.node, second.cluster.node) // the following tests confirm that the manually subscribed actors, got all the events they expected + func assertExpectedClusterEvents(events: [Cluster.Event], probe: ActorTestProbe) throws { // the specific type of snapshot we get is slightly racy: it could be .empty or contain already the node itself + guard case .some(Cluster.Event.snapshot) = events.first else { + throw probe.error("First event always expected to be .snapshot, was: \(optional: events.first)") + } + + // both nodes moved up + events.filter { event in + switch event { + case .membershipChange(let change) where change.isUp: + return true + default: + return false + } + }.count.shouldEqual(2) // both nodes moved to up + + // the leader is the right node + events.shouldContain(.leadershipChange(Cluster.LeadershipChange(oldLeader: nil, newLeader: .init(node: first.cluster.node, status: .joining))!)) // !-safe, since new/old leader known to be different + } // collect all events until we see leadership change; we should already have seen members become up then let eventsOnFirstSub = try collectUntilAllMembers(p1, status: .up) - eventsOnFirstSub.shouldContain(.snapshot(.empty)) - eventsOnFirstSub.shouldContain(.membershipChange(.init(node: first.cluster.node, fromStatus: nil, toStatus: .joining))) - eventsOnFirstSub.shouldContain(.membershipChange(.init(node: second.cluster.node, fromStatus: nil, toStatus: .joining))) - eventsOnFirstSub.shouldContain(.membershipChange(.init(node: first.cluster.node, fromStatus: .joining, toStatus: .up))) - eventsOnFirstSub.shouldContain(.membershipChange(.init(node: second.cluster.node, fromStatus: .joining, toStatus: .up))) - eventsOnFirstSub.shouldContain(.leadershipChange(Cluster.LeadershipChange(oldLeader: nil, newLeader: .init(node: first.cluster.node, status: .joining))!)) // !-safe, since new/old leader known to be different + try assertExpectedClusterEvents(events: eventsOnFirstSub, probe: p1) // on non-leader node let eventsOnSecondSub = try collectUntilAllMembers(p2, status: .up) - eventsOnSecondSub.shouldContain(.snapshot(.empty)) - eventsOnSecondSub.shouldContain(.membershipChange(.init(node: first.cluster.node, fromStatus: nil, toStatus: .joining))) - eventsOnSecondSub.shouldContain(.membershipChange(.init(node: second.cluster.node, fromStatus: nil, toStatus: .joining))) - eventsOnSecondSub.shouldContain(.membershipChange(.init(node: first.cluster.node, fromStatus: .joining, toStatus: .up))) - eventsOnSecondSub.shouldContain(.membershipChange(.init(node: second.cluster.node, fromStatus: .joining, toStatus: .up))) - eventsOnSecondSub.shouldContain(.leadershipChange(Cluster.LeadershipChange(oldLeader: nil, newLeader: .init(node: first.cluster.node, status: .joining))!)) // !-safe, since new/old leader known to be different + try assertExpectedClusterEvents(events: eventsOnSecondSub, probe: p2) } } private func collectUntilAllMembers(_ probe: ActorTestProbe, status: Cluster.MemberStatus) throws -> [Cluster.Event] { + pinfo("Cluster.Events on \(probe)") var events: [Cluster.Event] = [] var membership = Cluster.Membership.empty diff --git a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift index 217ddc934..ba3adb0f6 100644 --- a/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift +++ b/Tests/DistributedActorsTests/Cluster/DowningStrategy/DowningClusteredTests.swift @@ -18,6 +18,15 @@ import XCTest // "Get down!" final class DowningClusteredTests: ClusteredNodesTestBase { + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.excludeActorPaths = [ + "/system/replicator", + "/system/replicator/gossip", + "/system/receptionist", + "/system/cluster/swim", + ] + } + enum NodeStopMethod { case leaveSelfNode // TODO: eventually this one will be more graceful, ensure others see us leave etc case downSelf @@ -47,7 +56,10 @@ final class DowningClusteredTests: ClusteredNodesTestBase { _ modifySettings: ((inout ActorSystemSettings) -> Void)? = nil ) throws { try shouldNotThrow { - let (first, second) = self.setUpPair(modifySettings) + let (first, second) = self.setUpPair { settings in + settings.cluster.swim.probeInterval = .milliseconds(500) + modifySettings?(&settings) + } let thirdNeverDownSystem = self.setUpNode("third", modifySettings) try self.joinNodes(node: first, with: second, ensureMembers: .up) @@ -70,8 +82,6 @@ final class DowningClusteredTests: ClusteredNodesTestBase { let eventsProbeOther = self.testKit(otherNotDownPairSystem).spawnEventStreamTestProbe(subscribedTo: otherNotDownPairSystem.cluster.events) let eventsProbeThird = self.testKit(thirdNeverDownSystem).spawnEventStreamTestProbe(subscribedTo: thirdNeverDownSystem.cluster.events) - pinfo("Expecting [\(expectedDownSystem)] to become [.down], method to stop the node [\(stopMethod)]") - // we cause the stop of the target node as expected switch (stopMethod, stopNode) { case (.leaveSelfNode, .firstLeader): first.cluster.leave() @@ -87,20 +97,25 @@ final class DowningClusteredTests: ClusteredNodesTestBase { case (.downFromOtherMember, .secondNonLeader): thirdNeverDownSystem.cluster.down(node: second.cluster.node.node) } - func expectedDownMemberEventsFishing(on: ActorSystem) -> (Cluster.Event) -> ActorTestProbe.FishingDirective { - { event in + func expectedDownMemberEventsFishing( + on: ActorSystem, + file: StaticString = #file, line: UInt = #line + ) -> (Cluster.Event) -> ActorTestProbe.FishingDirective { + pinfo("Expecting [\(expectedDownSystem)] to become [.down] on [\(on.cluster.node.node)], method to stop the node [\(stopMethod)]") + + return { event in switch event { case .membershipChange(let change) where change.node == expectedDownNode && change.isRemoval: - pinfo("MembershipChange on \(on.cluster.node.node): \(change)") + pinfo("\(on.cluster.node.node): \(change)", file: file, line: line) return .catchComplete(change) case .membershipChange(let change) where change.node == expectedDownNode: - pinfo("MembershipChange on \(on.cluster.node.node): \(change)") + pinfo("\(on.cluster.node.node): \(change)", file: file, line: line) return .catchContinue(change) case .reachabilityChange(let change) where change.member.node == expectedDownNode: - pnote("ReachabilityChange on \(otherNotDownPairSystem.cluster.node.node): \(change)") + pnote("\(on.cluster.node.node): \(change)", file: file, line: line) return .ignore default: - // pnote("Event on \(otherNotDownPairSystem.cluster.node.node) = \(event)") + pnote("\(on.cluster.node.node): \(event)", file: file, line: line) return .ignore } } @@ -109,18 +124,17 @@ final class DowningClusteredTests: ClusteredNodesTestBase { // collect all events regarding the expectedDownNode's membership lifecycle // (the timeout is fairly large here to tolerate slow CI and variations how the events get propagated, normally they propagate quite quickly) let eventsOnOther = try eventsProbeOther.fishFor(Cluster.MembershipChange.self, within: .seconds(30), expectedDownMemberEventsFishing(on: otherNotDownPairSystem)) - let eventsOnThird = try eventsProbeThird.fishFor(Cluster.MembershipChange.self, within: .seconds(30), expectedDownMemberEventsFishing(on: thirdNeverDownSystem)) - eventsOnOther.shouldContain(where: { change in change.toStatus.isDown && (change.fromStatus == .joining || change.fromStatus == .up) }) eventsOnOther.shouldContain(Cluster.MembershipChange(node: expectedDownNode, fromStatus: .down, toStatus: .removed)) - eventsOnOther.shouldContain(where: { change in change.toStatus.isDown && (change.fromStatus == .joining || change.fromStatus == .up) }) + let eventsOnThird = try eventsProbeThird.fishFor(Cluster.MembershipChange.self, within: .seconds(30), expectedDownMemberEventsFishing(on: thirdNeverDownSystem)) + eventsOnThird.shouldContain(where: { change in change.toStatus.isDown && (change.fromStatus == .joining || change.fromStatus == .up) }) eventsOnThird.shouldContain(Cluster.MembershipChange(node: expectedDownNode, fromStatus: .down, toStatus: .removed)) } } // ==== ---------------------------------------------------------------------------------------------------------------- - // MARK: Stop by: cluster.leave() + // MARK: Stop by: cluster.leave() immediate func test_stopLeader_by_leaveSelfNode_shouldPropagateToOtherNodes() throws { try self.shared_stoppingNode_shouldPropagateToOtherNodesAsDown(stopMethod: .leaveSelfNode, stopNode: .firstLeader) { settings in diff --git a/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift b/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift index 52bb45a4a..553ff4cd1 100644 --- a/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift +++ b/Tests/DistributedActorsTests/Cluster/LeadershipTests.swift @@ -24,7 +24,7 @@ final class LeadershipTests: XCTestCase { let memberC = Cluster.Member(node: UniqueNode(node: Node(systemName: "System", host: "3.3.3.3", port: 9119), nid: .random()), status: .up) let newMember = Cluster.Member(node: UniqueNode(node: Node(systemName: "System", host: "4.4.4.4", port: 1001), nid: .random()), status: .up) - let fakeContext = LeaderElectionContext(log: Logger(label: "mock"), eventLoop: EmbeddedEventLoop()) + let fakeContext = LeaderElectionContext(log: NoopLogger.make(), eventLoop: EmbeddedEventLoop()) lazy var initialMembership: Cluster.Membership = [ memberA, memberB, memberC, diff --git a/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift b/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift index bc483aa98..278024100 100644 --- a/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift +++ b/Tests/DistributedActorsTests/Cluster/RemotingHandshakeStateMachineTests.swift @@ -138,7 +138,8 @@ final class RemoteHandshakeStateMachineTests: XCTestCase { guard case .scheduleRetryHandshake = clientInitiated.onHandshakeTimeout() else { throw shouldNotHappen("Expected retry attempt after handshake timeout") } - guard case .scheduleRetryHandshake = clientInitiated.onHandshakeError(TestError("Boom!")) else { + + guard case .scheduleRetryHandshake = clientInitiated.onConnectionError(TestError("Boom!")) else { throw shouldNotHappen("Expected retry attempt after handshake timeout") } } diff --git a/Tests/DistributedActorsTests/Cluster/TestExtensions.swift b/Tests/DistributedActorsTests/Cluster/TestExtensions.swift index d082d5f17..7ddcdd916 100644 --- a/Tests/DistributedActorsTests/Cluster/TestExtensions.swift +++ b/Tests/DistributedActorsTests/Cluster/TestExtensions.swift @@ -38,7 +38,7 @@ extension ClusterShellState { settings: settings, channel: EmbeddedChannel(), events: EventStream(ref: ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), - gossipControl: ConvergentGossipControl(ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), + gossipControl: GossipControl(ActorRef(.deadLetters(.init(log, address: ._deadLetters, system: nil)))), log: log ) } diff --git a/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift b/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift index e7869c61c..fa76eeb61 100644 --- a/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift +++ b/Tests/DistributedActorsTests/NodeDeathWatcherTests.swift @@ -62,9 +62,7 @@ final class NodeDeathWatcherTests: ClusteredNodesTestBase { // should cause termination of all remote actors, observed by the local actors on [first] let termination1: Signals.Terminated = try p.expectMessage() - pinfo("termination 1: \(termination1)") let termination2: Signals.Terminated = try p.expectMessage() - pinfo("termination 2: \(termination2)") let terminations: [Signals.Terminated] = [termination1, termination2] terminations.shouldContain(where: { terminated in (!terminated.existenceConfirmed) && terminated.address.name == "remote-1" diff --git a/Tests/DistributedActorsTests/NoopLogger.swift b/Tests/DistributedActorsTests/NoopLogger.swift new file mode 100644 index 000000000..adf5ac14a --- /dev/null +++ b/Tests/DistributedActorsTests/NoopLogger.swift @@ -0,0 +1,39 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +public struct NoopLogger { + public static func make() -> Logger { + .init(label: "noop", factory: { _ in NoopLogHandler() }) + } +} + +public struct NoopLogHandler: LogHandler { + public func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) { + // ignore + } + + public subscript(metadataKey _: String) -> Logger.MetadataValue? { + get { + nil // ignore + } + set { + // ignore + } + } + + public var metadata: Logger.Metadata = [:] + public var logLevel: Logger.Level = .critical +} diff --git a/Tests/DistributedActorsTests/SerializationPoolTests.swift b/Tests/DistributedActorsTests/SerializationPoolTests.swift index 94d9fdbc7..8c58fe3ff 100644 --- a/Tests/DistributedActorsTests/SerializationPoolTests.swift +++ b/Tests/DistributedActorsTests/SerializationPoolTests.swift @@ -95,6 +95,7 @@ final class SerializationPoolTests: XCTestCase { override func setUp() { self.system = ActorSystem("SerializationTests") { settings in + settings.logging.logger = NoopLogger.make() settings.serialization.register(Test1.self) settings.serialization.register(Test2.self) }