Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
ad2498a
=cluster fix double-handshakes and double-cluster events; Fixes test_…
ktoso Jun 4, 2020
2d876ee
test_up_ensureAllSubscribersGetMovingUpEvents OK
ktoso Jun 5, 2020
ed6cc65
fix the possibility of when incoming handshake offer is being accepted
ktoso Jun 5, 2020
334e17a
remove comments/println
ktoso Jun 5, 2020
7b749c2
=cluster,handshake #141 #389 #450 #604 handshakes retry (backoff) and…
ktoso Jun 11, 2020
9c8b709
=test better logging for CRDT ActorOwned tests
ktoso Jun 11, 2020
81ebc1d
=cluster,gossip Drop ConvergentGossip and use Gossip<> for membership
ktoso Jun 12, 2020
8d347ae
=gossip implement onClusterMember peer discovery and use in Membership
ktoso Jun 12, 2020
758692d
=test fix test_handshake_shouldNotifyOnRejection
ktoso Jun 12, 2020
bd60cc6
=test better logs in DowningClusteredTests
ktoso Jun 12, 2020
43be8e7
=test silence some test logs, by using NoopLogger
ktoso Jun 12, 2020
5f5cbbf
=serialization avoid top-level encoded String, since JSONEncoder disl…
ktoso Jun 12, 2020
6f7bdc8
=test #157 we've not seen it fail a long time, removed extra logging
ktoso Jun 15, 2020
f7708ca
=test adjust test that now all gossips use the same mechanism
ktoso Jun 15, 2020
3908125
=log fix metadata handling in LogCapture as we now rely on it for act…
ktoso Jun 15, 2020
1d1dd36
=test #616 fix too aggressive assertions in test_autoUpdatedListing_u…
ktoso Jun 15, 2020
dcc880d
+cluster #670 leaving/down node MUST reject aggresively new handshakes
ktoso Jun 15, 2020
e05b22d
Update Docs/internals.adoc
ktoso Jun 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions Docs/internals.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,12 @@ R -[#red]-x L: HandshakeReject
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
```

==== Handshake phases

=== In memory core concepts
Note that handshakes may "race" from both nodes to each other concurrently, thus the system has to "pick one".

==== Association

Established between two nodes.

Only "associated nodes" may talk to one another.

Rules:

- don't talk to strangers
* nodes MUST handshake before exchanging any messages

=== *RemoteControl

Control objects allow performing actual actions onto the network layer.

These are exposed to actual refs and used to e.g. send messages.
There can be many remote controls, but always one association for a pair of nodes.

==== Watch and clustering

Expand Down
30 changes: 22 additions & 8 deletions Sources/DistributedActors/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ internal final class LoggingContext {
/// such as it's path or node on which it resides.
///
/// The preferred way of obtaining a logger for an actor or system is `context.log` or `system.log`, rather than creating new ones.
public struct ActorLogger {
extension Logger {
public static func make<T>(context: ActorContext<T>) -> Logger {
var log = context.system.log
log[metadataKey: "actor/path"] = Logger.MetadataValue.stringConvertible(context.path)
Logger.make(context.log, path: context.path)
}

internal static func make(_ base: Logger, path: ActorPath) -> Logger {
var log = base
log[metadataKey: "actor/path"] = Logger.MetadataValue.stringConvertible(path)
return log
}
}
Expand Down Expand Up @@ -274,22 +278,32 @@ public struct LogMessage {
let line: UInt
}

// MARK: Extend logging metadata storage capabilities

extension Logger.Metadata {
extension Logger.MetadataValue {
public static func pretty<T>(_ value: T) -> Logger.Metadata.Value where T: CustomPrettyStringConvertible {
.string(value.prettyDescription)
Logger.MetadataValue.stringConvertible(CustomPrettyStringConvertibleMetadataValue(value))
}

public static func pretty<T>(_ value: T) -> Logger.Metadata.Value {
if let pretty = value as? CustomPrettyStringConvertible {
return .string(pretty.prettyDescription)
return Logger.MetadataValue.stringConvertible(CustomPrettyStringConvertibleMetadataValue(pretty))
} else {
return .string("\(value)")
}
}
}

struct CustomPrettyStringConvertibleMetadataValue: CustomStringConvertible {
let value: CustomPrettyStringConvertible

init(_ value: CustomPrettyStringConvertible) {
self.value = value
}

var description: String {
"\(self.value)"
}
}

extension Optional where Wrapped == Logger.MetadataValue {
public static func lazyStringConvertible(_ makeValue: @escaping () -> CustomStringConvertible) -> Logger.Metadata.Value {
.stringConvertible(LazyMetadataBox { makeValue() })
Expand Down
3 changes: 2 additions & 1 deletion Sources/DistributedActors/ActorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
self.behavior = behavior
self._address = address
self._props = props
self._log = .make(system.log, path: address.path)

self.supervisor = Supervision.supervisorFor(system, initialBehavior: behavior, props: props.supervision)
self._deathWatch = DeathWatch(nodeDeathWatcher: system._nodeDeathWatcher ?? system.deadLetters.adapted())
Expand Down Expand Up @@ -239,7 +240,7 @@ public final class ActorShell<Message: ActorMessage>: ActorContext<Message>, Abs
}

// access only from within actor
private lazy var _log = ActorLogger.make(context: self)
private var _log: Logger
public override var log: Logger {
get {
self._log
Expand Down
28 changes: 25 additions & 3 deletions Sources/DistributedActors/Backoff.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,22 @@ public enum Backoff {
/// MUST be `>= initialInterval`.
/// - randomFactor: A random factor of `0.5` results in backoffs between 50% below and 50% above the base interval.
/// MUST be between: `<0; 1>` (inclusive)
/// - maxAttempts: An optional maximum number of times backoffs shall be attempted.
/// MUST be `> 0` if set (or `nil`).
public static func exponential(
initialInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.initialInterval,
multiplier: Double = ExponentialBackoffStrategy.Defaults.multiplier,
capInterval: TimeAmount = ExponentialBackoffStrategy.Defaults.capInterval,
randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor
randomFactor: Double = ExponentialBackoffStrategy.Defaults.randomFactor,
maxAttempts: Int? = ExponentialBackoffStrategy.Defaults.maxAttempts
) -> ExponentialBackoffStrategy {
.init(initialInterval: initialInterval, multiplier: multiplier, capInterval: capInterval, randomFactor: randomFactor)
.init(
initialInterval: initialInterval,
multiplier: multiplier,
capInterval: capInterval,
randomFactor: randomFactor,
maxAttempts: maxAttempts
)
}
}

Expand Down Expand Up @@ -148,30 +157,43 @@ public struct ExponentialBackoffStrategy: BackoffStrategy {

// TODO: We could also implement taking a Clock, and using it see if there's a total limit exceeded
// public static let maxElapsedTime: TimeAmount = .minutes(30)

public static let maxAttempts: Int? = nil
}

let initialInterval: TimeAmount
let multiplier: Double
let capInterval: TimeAmount
let randomFactor: Double

var limitedRemainingAttempts: Int?

// interval that will be used in the `next()` call, does NOT include the random noise component
private var currentBaseInterval: TimeAmount

internal init(initialInterval: TimeAmount, multiplier: Double, capInterval: TimeAmount, randomFactor: Double) {
internal init(initialInterval: TimeAmount, multiplier: Double, capInterval: TimeAmount, randomFactor: Double, maxAttempts: Int?) {
precondition(initialInterval.nanoseconds > 0, "initialInterval MUST be > 0ns, was: [\(initialInterval.prettyDescription)]")
precondition(multiplier >= 1.0, "multiplier MUST be >= 1.0, was: [\(multiplier)]")
precondition(initialInterval <= capInterval, "capInterval MUST be >= initialInterval, was: [\(capInterval)]")
precondition(randomFactor >= 0.0 && randomFactor <= 1.0, "randomFactor MUST be within between 0 and 1, was: [\(randomFactor)]")
if let n = maxAttempts {
precondition(n > 0, "maxAttempts MUST be nil or > 0, was: [\(n)]")
}

self.initialInterval = initialInterval
self.currentBaseInterval = initialInterval
self.multiplier = multiplier
self.capInterval = capInterval
self.randomFactor = randomFactor
self.limitedRemainingAttempts = maxAttempts
}

public mutating func next() -> TimeAmount? {
defer { self.limitedRemainingAttempts? -= 1 }
if let remainingAttempts = self.limitedRemainingAttempts, remainingAttempts <= 0 {
return nil
} // else, still attempts remaining, or no limit set

let baseInterval = self.currentBaseInterval
let randomizeMultiplier = Double.random(in: (1 - self.randomFactor) ... (1 + self.randomFactor))

Expand Down
6 changes: 6 additions & 0 deletions Sources/DistributedActors/CRDT/ActorOwned+CRDT.swift
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ extension CRDT {
}
}

extension CRDT.ActorOwned: CustomStringConvertible, CustomPrettyStringConvertible {
public var description: String {
"CRDT.ActorOwned(id: \(self.id), data: \(self.data), status: \(self.status))"
}
}

extension CRDT.ActorOwned {
/// Register callback for owning actor to be notified when the CRDT instance has been updated.
///
Expand Down
7 changes: 5 additions & 2 deletions Sources/DistributedActors/CRDT/CRDT+Gossip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ extension CRDT.Identity: GossipIdentifier {

extension CRDT {
/// The gossip to be spread about a specific CRDT (identity).
struct Gossip: GossipEnvelopeProtocol, CustomPrettyStringConvertible {
struct Gossip: GossipEnvelopeProtocol, CustomStringConvertible, CustomPrettyStringConvertible {
struct Metadata: Codable {}

typealias Payload = StateBasedCRDT

var metadata: Metadata
Expand All @@ -197,6 +196,10 @@ extension CRDT {
mutating func tryMerge(other: StateBasedCRDT) -> CRDT.MergeError? {
self.payload._tryMerge(other: other)
}

var description: String {
"CRDT.Gossip(metadata: \(metadata), payload: \(payload))"
}
}
}

Expand Down
8 changes: 3 additions & 5 deletions Sources/DistributedActors/CRDT/CRDT+ReplicatorShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ extension CRDT.Replicator {
}
)

self.gossipReplication = try GossipShell.start(
self.gossipReplication = try Gossiper.start(
context,
name: "gossip",
settings: GossipShell.Settings(
settings: Gossiper.Settings(
gossipInterval: self.settings.gossipInterval,
gossipIntervalRandomFactor: self.settings.gossipIntervalRandomFactor,
peerDiscovery: .fromReceptionistListing(id: "crdt-gossip-replicator")
Expand Down Expand Up @@ -130,10 +130,8 @@ extension CRDT.Replicator {
self.receiveClusterEvent(context, event: .membershipChange(change))
}

case .membershipChange:
context.log.trace("Ignoring cluster event \(event), only interested in >= .up events", metadata: self.metadata(context))
default:
return // ignore other events
return // ignore other events (including membership changes for events lesser than .up)
}
}

Expand Down
16 changes: 13 additions & 3 deletions Sources/DistributedActors/Cluster/Association.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import DistributedActorsConcurrencyHelpers
import struct Foundation.Date
import Logging
import NIO

Expand Down Expand Up @@ -65,7 +66,7 @@ final class Association: CustomStringConvertible {

/// Complete the association and drain any pending message sends onto the channel.
// TODO: This style can only ever work since we lock around the entirety of enqueueing messages and this setting; make it such that we don't need the lock eventually
func completeAssociation(handshake: HandshakeStateMachine.CompletedState, over channel: Channel) {
func completeAssociation(handshake: HandshakeStateMachine.CompletedState, over channel: Channel) throws {
assert(
self.remoteNode == handshake.remoteNode,
"""
Expand All @@ -75,7 +76,7 @@ final class Association: CustomStringConvertible {
"""
)

self.lock.withLockVoid {
try self.lock.withLockVoid {
switch self.state {
case .associating(let sendQueue):
// 1) we need to flush all the queued up messages
Expand All @@ -93,10 +94,14 @@ final class Association: CustomStringConvertible {
self.state = .associated(channel: channel)

case .associated:
_ = channel.close() // TODO: throw instead of accepting a "double complete"?
let desc = "\(channel)"
_ = channel.close()
throw AssociationError.attemptToCompleteAlreadyCompletedAssociation(self, offendingChannelDescription: desc)

case .tombstone:
let desc = "\(channel)"
_ = channel.close()
throw AssociationError.attemptToCompleteTombstonedAssociation(self, offendingChannelDescription: desc)
}
}
}
Expand Down Expand Up @@ -218,6 +223,11 @@ extension Association {
}
}

enum AssociationError: Error {
case attemptToCompleteAlreadyCompletedAssociation(Association, offendingChannelDescription: String)
case attemptToCompleteTombstonedAssociation(Association, offendingChannelDescription: String)
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Association Tombstone

Expand Down
16 changes: 13 additions & 3 deletions Sources/DistributedActors/Cluster/Cluster+Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ extension Cluster {
self.fromStatus = replaced.status
self.toStatus = newMember.status
}

public func hash(into hasher: inout Hasher) {
self.member.hash(into: &hasher)
}

public static func == (lhs: MembershipChange, rhs: MembershipChange) -> Bool {
lhs.member == rhs.member &&
lhs.replaced == rhs.replaced &&
lhs.fromStatus == rhs.fromStatus &&
lhs.toStatus == rhs.toStatus
}
}
}

Expand All @@ -129,9 +140,8 @@ extension Cluster.MembershipChange {
self.toStatus.isDown
}

/// Matches when a change is to: `.down`, `.leaving` or `.removed`.
public var isAtLeastDown: Bool {
self.toStatus >= .down
public func isAtLeast(_ status: Cluster.MemberStatus) -> Bool {
self.toStatus >= status
}

public var isLeaving: Bool {
Expand Down
24 changes: 20 additions & 4 deletions Sources/DistributedActors/Cluster/Cluster+Gossip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ extension Cluster {
case .some(let locallyKnownMember) where locallyKnownMember.status.isDown:
// we have NOT removed it yet, but it is down, so we ignore it
return .init(causalRelation: causalRelation, effectiveChanges: [])
case .none where incomingOwnerMember.status.isAtLeastDown:
case .none where incomingOwnerMember.status.isAtLeast(.down):
// we have likely removed it, and it is down anyway, so we ignore it completely
return .init(causalRelation: causalRelation, effectiveChanges: [])
default:
Expand Down Expand Up @@ -169,6 +169,21 @@ extension Cluster {
}
}

extension Cluster.Gossip: GossipEnvelopeProtocol {
typealias Metadata = SeenTable
typealias Payload = Self

var metadata: Metadata {
self.seen
}

var payload: Payload {
self
}
}

extension Cluster.Gossip: CustomPrettyStringConvertible {}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster.Gossip.SeenTable

Expand Down Expand Up @@ -290,10 +305,11 @@ extension Cluster.Gossip.SeenTable: CustomStringConvertible, CustomPrettyStringC
"Cluster.Gossip.SeenTable(\(self.underlying))"
}

public var prettyDescription: String {
public func prettyDescription(depth: Int) -> String {
var s = "Cluster.Gossip.SeenTable(\n"
let entryHeadingPadding = String(repeating: " ", count: 4)
let entryPadding = String(repeating: " ", count: 4 * 2)
let entryHeadingPadding = String(repeating: " ", count: 4 * depth)
let entryPadding = String(repeating: " ", count: 4 * (depth + 1))

underlying.sorted(by: { $0.key < $1.key }).forEach { node, vv in
let entryHeader = "\(entryHeadingPadding)\(node) observed versions:\n"

Expand Down
5 changes: 2 additions & 3 deletions Sources/DistributedActors/Cluster/Cluster+Member.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,8 @@ extension Cluster.MemberStatus {
self == .down
}

/// Convenience function to check if a status is `.removed` or `.removed`
public var isAtLeastDown: Bool {
self >= .down
public func isAtLeast(_ status: Cluster.MemberStatus) -> Bool {
self >= status
}

/// Convenience function to check if a status is `.removed`
Expand Down
Loading