diff --git a/Sources/DistributedActors/Cluster/Association.swift b/Sources/DistributedActors/Cluster/Association.swift index abb5c7408..924f24e15 100644 --- a/Sources/DistributedActors/Cluster/Association.swift +++ b/Sources/DistributedActors/Cluster/Association.swift @@ -244,7 +244,7 @@ extension Association { /// Determines when the Tombstone should be removed from kept tombstones in the ClusterShell. /// End of life of the tombstone is calculated as `now + settings.associationTombstoneTTL`. - let removalDeadline: Deadline // TODO: cluster should have timer to try to remove those periodically + let removalDeadline: Deadline init(_ node: UniqueNode, settings: ClusterSystemSettings) { // TODO: if we made system carry system.time we could always count from that point in time with a TimeAmount; require Clock and settings then diff --git a/Sources/DistributedActors/Cluster/ClusterShell.swift b/Sources/DistributedActors/Cluster/ClusterShell.swift index b3c26be4c..ddc580ae2 100644 --- a/Sources/DistributedActors/Cluster/ClusterShell.swift +++ b/Sources/DistributedActors/Cluster/ClusterShell.swift @@ -209,6 +209,14 @@ internal class ClusterShell { } } + /// For testing only. + /// Safe to concurrently access by privileged internals. + internal var _testingOnly_associationTombstones: [Association.Tombstone] { + self._associationsLock.withLock { + [Association.Tombstone](self._associationTombstones.values) + } + } + /// For testing only. internal func _associatedNodes() -> Set { self._associationsLock.withLock { @@ -333,6 +341,7 @@ internal class ClusterShell { /// Used to signal a "down was issued" either by the user, or another part of the system. case downCommandMember(Cluster.Member) case shutdown(BlockingReceptacle) // TODO: could be NIO future + case cleanUpAssociationTombstones } enum QueryMessage: NonTransportableActorMessage { @@ -461,6 +470,18 @@ extension ClusterShell { } } + /// Called periodically to remove association tombstones after the configured TTL. + private func cleanUpAssociationTombstones() -> _Behavior { + self._associationsLock.withLockVoid { + for (id, tombstone) in self._associationTombstones { + if tombstone.removalDeadline.isOverdue() { + self._associationTombstones.removeValue(forKey: id) + } + } + } + return .same + } + /// Ready and interpreting commands and incoming messages. /// /// Serves as main "driver" for handshake and association state machines. @@ -498,6 +519,8 @@ extension ClusterShell { } case .downCommandMember(let member): return self.ready(state: self.onDownCommand(context, state: state, member: member)) + case .cleanUpAssociationTombstones: + return self.cleanUpAssociationTombstones() } } diff --git a/Sources/DistributedActors/ClusterSystem.swift b/Sources/DistributedActors/ClusterSystem.swift index 00839a495..8e5d7ead2 100644 --- a/Sources/DistributedActors/ClusterSystem.swift +++ b/Sources/DistributedActors/ClusterSystem.swift @@ -57,6 +57,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { internal let lifecycleWatchLock = Lock() internal var _lifecycleWatches: [ActorAddress: LifecycleWatchContainer] = [:] + private var _associationTombstoneCleanupTask: RepeatedTask? + private let dispatcher: InternalMessageDispatcher // Access MUST be protected with `namingLock`. @@ -300,6 +302,13 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized _ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents))) + self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask( + initialDelay: settings.associationTombstoneCleanupInterval.toNIO, + delay: settings.associationTombstoneCleanupInterval.toNIO + ) { _ in + clusterRef.tell(.command(.cleanUpAssociationTombstones)) + } + // node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered // Node watcher MUST be started AFTER cluster and clusterEvents let lazyNodeDeathWatcher = try! self._prepareSystemActor( @@ -458,6 +467,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { // self._serialization = nil // FIXME: need to release serialization } */ + self._associationTombstoneCleanupTask?.cancel() + self._associationTombstoneCleanupTask = nil _ = self._clusterStore.storeIfNilThenLoad(Box(nil)) self.shutdownReceptacle.offerOnce(nil) diff --git a/Sources/DistributedActors/ClusterSystemSettings.swift b/Sources/DistributedActors/ClusterSystemSettings.swift index af715a371..a04c05652 100644 --- a/Sources/DistributedActors/ClusterSystemSettings.swift +++ b/Sources/DistributedActors/ClusterSystemSettings.swift @@ -115,6 +115,9 @@ public struct ClusterSystemSettings { /// communicated with again. Tombstones are used to ensure this, even if the downed ("zombie") node, attempts to reconnect. public var associationTombstoneTTL: TimeAmount = .hours(24) * 1 + /// Defines the interval with which the list of associated tombstones is freed from expired tombstones. + public var associationTombstoneCleanupInterval: TimeAmount = .minutes(10) + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Cluster protocol versioning diff --git a/Tests/DistributedActorsTests/ClusterSystemTests.swift b/Tests/DistributedActorsTests/ClusterSystemTests.swift index 3a0acd5e1..238bca60a 100644 --- a/Tests/DistributedActorsTests/ClusterSystemTests.swift +++ b/Tests/DistributedActorsTests/ClusterSystemTests.swift @@ -172,6 +172,36 @@ final class ClusterSystemTests: ActorSystemXCTestCase { throw testKit.fail("Expected timeout to be 1 second but was \(timeoutError.timeout)") } } + + func test_cleanUpAssociationTombstones() async throws { + let local = await setUpNode("local") { + $0.associationTombstoneTTL = .seconds(0) + } + let remote = await setUpNode("remote") + local.cluster.join(node: remote.cluster.uniqueNode) + + let remoteAssociationControlState0 = local._cluster!.getEnsureAssociation(with: remote.cluster.uniqueNode) + guard case ClusterShell.StoredAssociationState.association(let remoteControl0) = remoteAssociationControlState0 else { + throw Boom("Expected the association to exist for \(remote.cluster.uniqueNode)") + } + + ClusterShell.shootTheOtherNodeAndCloseConnection(system: local, targetNodeAssociation: remoteControl0) + + // Node should eventually appear in tombstones + try self.testKit(local).eventually(within: .seconds(3)) { + guard local._cluster?._testingOnly_associationTombstones.isEmpty == false else { + throw Boom("Expected tombstone for downed node") + } + } + + local._cluster?.ref.tell(.command(.cleanUpAssociationTombstones)) + + try self.testKit(local).eventually(within: .seconds(3)) { + guard local._cluster?._testingOnly_associationTombstones.isEmpty == true else { + throw Boom("Expected tombstones to get cleared") + } + } + } } private distributed actor DelayedGreeter {