Skip to content

Commit 1ae223b

Browse files
committed
Periodically prune association tombstones
1 parent b00028d commit 1ae223b

File tree

4 files changed

+29
-1
lines changed

4 files changed

+29
-1
lines changed

Sources/DistributedActors/Cluster/Association.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ extension Association {
244244

245245
/// Determines when the Tombstone should be removed from kept tombstones in the ClusterShell.
246246
/// End of life of the tombstone is calculated as `now + settings.associationTombstoneTTL`.
247-
let removalDeadline: Deadline // TODO: cluster should have timer to try to remove those periodically
247+
let removalDeadline: Deadline
248248

249249
init(_ node: UniqueNode, settings: ClusterSystemSettings) {
250250
// TODO: if we made system carry system.time we could always count from that point in time with a TimeAmount; require Clock and settings then

Sources/DistributedActors/Cluster/ClusterShell.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ internal class ClusterShell {
333333
/// Used to signal a "down was issued" either by the user, or another part of the system.
334334
case downCommandMember(Cluster.Member)
335335
case shutdown(BlockingReceptacle<Void>) // TODO: could be NIO future
336+
case cleanUpAssociationTombstones
336337
}
337338

338339
enum QueryMessage: NonTransportableActorMessage {
@@ -461,6 +462,17 @@ extension ClusterShell {
461462
}
462463
}
463464

465+
private func cleanUpAssociationTombstones() -> _Behavior<Message> {
466+
self._associationsLock.withLockVoid {
467+
for (id, tombstone) in self._associationTombstones {
468+
if tombstone.removalDeadline.isOverdue() {
469+
self._associationTombstones.removeValue(forKey: id)
470+
}
471+
}
472+
}
473+
return .same
474+
}
475+
464476
/// Ready and interpreting commands and incoming messages.
465477
///
466478
/// Serves as main "driver" for handshake and association state machines.
@@ -498,6 +510,8 @@ extension ClusterShell {
498510
}
499511
case .downCommandMember(let member):
500512
return self.ready(state: self.onDownCommand(context, state: state, member: member))
513+
case .cleanUpAssociationTombstones:
514+
return self.cleanUpAssociationTombstones()
501515
}
502516
}
503517

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
5757
internal let lifecycleWatchLock = Lock()
5858
internal var _lifecycleWatches: [ActorAddress: LifecycleWatchContainer] = [:]
5959

60+
private var _associationTombstoneCleanupTask: RepeatedTask?
61+
6062
private let dispatcher: InternalMessageDispatcher
6163

6264
// Access MUST be protected with `namingLock`.
@@ -300,6 +302,13 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
300302
let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized
301303
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents)))
302304

305+
self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask(
306+
initialDelay: settings.associationTombstoneCleanupInterval.toNIO,
307+
delay: settings.associationTombstoneCleanupInterval.toNIO
308+
) { _ in
309+
clusterRef.tell(.command(.cleanUpAssociationTombstones))
310+
}
311+
303312
// node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered
304313
// Node watcher MUST be started AFTER cluster and clusterEvents
305314
let lazyNodeDeathWatcher = try! self._prepareSystemActor(
@@ -458,6 +467,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
458467
// self._serialization = nil // FIXME: need to release serialization
459468
}
460469
*/
470+
self._associationTombstoneCleanupTask?.cancel()
471+
self._associationTombstoneCleanupTask = nil
461472
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
462473

463474
self.shutdownReceptacle.offerOnce(nil)

Sources/DistributedActors/ClusterSystemSettings.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ public struct ClusterSystemSettings {
115115
/// communicated with again. Tombstones are used to ensure this, even if the downed ("zombie") node, attempts to reconnect.
116116
public var associationTombstoneTTL: TimeAmount = .hours(24) * 1
117117

118+
/// Defines the interval with which the list of associated tombstones is freed from expired tombstones.
119+
public var associationTombstoneCleanupInterval: TimeAmount = .minutes(10)
120+
118121
// ==== ------------------------------------------------------------------------------------------------------------
119122
// MARK: Cluster protocol versioning
120123

0 commit comments

Comments
 (0)