Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/DistributedActors/Cluster/Association.swift
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ extension Association {

/// Determines when the Tombstone should be removed from kept tombstones in the ClusterShell.
/// End of life of the tombstone is calculated as `now + settings.associationTombstoneTTL`.
let removalDeadline: Deadline // TODO: cluster should have timer to try to remove those periodically
let removalDeadline: Deadline

init(_ node: UniqueNode, settings: ClusterSystemSettings) {
// TODO: if we made system carry system.time we could always count from that point in time with a TimeAmount; require Clock and settings then
Expand Down
23 changes: 23 additions & 0 deletions Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ internal class ClusterShell {
}
}

/// For testing only.
/// Safe to concurrently access by privileged internals.
internal var _testingOnly_associationTombstones: [Association.Tombstone] {
self._associationsLock.withLock {
[Association.Tombstone](self._associationTombstones.values)
}
}

/// For testing only.
internal func _associatedNodes() -> Set<UniqueNode> {
self._associationsLock.withLock {
Expand Down Expand Up @@ -333,6 +341,7 @@ internal class ClusterShell {
/// Used to signal a "down was issued" either by the user, or another part of the system.
case downCommandMember(Cluster.Member)
case shutdown(BlockingReceptacle<Void>) // TODO: could be NIO future
case cleanUpAssociationTombstones
}

enum QueryMessage: NonTransportableActorMessage {
Expand Down Expand Up @@ -461,6 +470,18 @@ extension ClusterShell {
}
}

/// Called periodically to remove association tombstones after the configured TTL.
private func cleanUpAssociationTombstones() -> _Behavior<Message> {
self._associationsLock.withLockVoid {
for (id, tombstone) in self._associationTombstones {
if tombstone.removalDeadline.isOverdue() {
self._associationTombstones.removeValue(forKey: id)
}
}
}
return .same
}

/// Ready and interpreting commands and incoming messages.
///
/// Serves as main "driver" for handshake and association state machines.
Expand Down Expand Up @@ -498,6 +519,8 @@ extension ClusterShell {
}
case .downCommandMember(let member):
return self.ready(state: self.onDownCommand(context, state: state, member: member))
case .cleanUpAssociationTombstones:
return self.cleanUpAssociationTombstones()
}
}

Expand Down
11 changes: 11 additions & 0 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
internal let lifecycleWatchLock = Lock()
internal var _lifecycleWatches: [ActorAddress: LifecycleWatchContainer] = [:]

private var _associationTombstoneCleanupTask: RepeatedTask?

private let dispatcher: InternalMessageDispatcher

// Access MUST be protected with `namingLock`.
Expand Down Expand Up @@ -300,6 +302,13 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents)))

self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask(
initialDelay: settings.associationTombstoneCleanupInterval.toNIO,
delay: settings.associationTombstoneCleanupInterval.toNIO
) { _ in
clusterRef.tell(.command(.cleanUpAssociationTombstones))
}

// node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered
// Node watcher MUST be started AFTER cluster and clusterEvents
let lazyNodeDeathWatcher = try! self._prepareSystemActor(
Expand Down Expand Up @@ -458,6 +467,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
// self._serialization = nil // FIXME: need to release serialization
}
*/
self._associationTombstoneCleanupTask?.cancel()
self._associationTombstoneCleanupTask = nil
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))

self.shutdownReceptacle.offerOnce(nil)
Expand Down
3 changes: 3 additions & 0 deletions Sources/DistributedActors/ClusterSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public struct ClusterSystemSettings {
/// communicated with again. Tombstones are used to ensure this, even if the downed ("zombie") node, attempts to reconnect.
public var associationTombstoneTTL: TimeAmount = .hours(24) * 1

/// Defines the interval with which the list of associated tombstones is freed from expired tombstones.
public var associationTombstoneCleanupInterval: TimeAmount = .minutes(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 minutes sounds good 👍


// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Cluster protocol versioning

Expand Down
30 changes: 30 additions & 0 deletions Tests/DistributedActorsTests/ClusterSystemTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,36 @@ final class ClusterSystemTests: ActorSystemXCTestCase {
throw testKit.fail("Expected timeout to be 1 second but was \(timeoutError.timeout)")
}
}

func test_cleanUpAssociationTombstones() async throws {
let local = await setUpNode("local") {
$0.associationTombstoneTTL = .seconds(0)
}
let remote = await setUpNode("remote")
local.cluster.join(node: remote.cluster.uniqueNode)

let remoteAssociationControlState0 = local._cluster!.getEnsureAssociation(with: remote.cluster.uniqueNode)
guard case ClusterShell.StoredAssociationState.association(let remoteControl0) = remoteAssociationControlState0 else {
throw Boom("Expected the association to exist for \(remote.cluster.uniqueNode)")
}

ClusterShell.shootTheOtherNodeAndCloseConnection(system: local, targetNodeAssociation: remoteControl0)

// Node should eventually appear in tombstones
try self.testKit(local).eventually(within: .seconds(3)) {
guard local._cluster?._testingOnly_associationTombstones.isEmpty == false else {
throw Boom("Expected tombstone for downed node")
}
}

local._cluster?.ref.tell(.command(.cleanUpAssociationTombstones))

try self.testKit(local).eventually(within: .seconds(3)) {
guard local._cluster?._testingOnly_associationTombstones.isEmpty == true else {
throw Boom("Expected tombstones to get cleared")
}
}
}
}

private distributed actor DelayedGreeter {
Expand Down