diff --git a/Sources/DistributedCluster/Cluster/Downing/DowningStrategy.swift b/Sources/DistributedCluster/Cluster/Downing/DowningStrategy.swift index 6441a4521..9bf3c3fb2 100644 --- a/Sources/DistributedCluster/Cluster/Downing/DowningStrategy.swift +++ b/Sources/DistributedCluster/Cluster/Downing/DowningStrategy.swift @@ -93,10 +93,10 @@ internal distributed actor DowningStrategyShell { self.strategy = strategy self.actorSystem = system self.eventsListeningTask = Task { [weak self] in - guard let __secretlyKnownToBeLocal = self else { return } // FIXME(local): we really need `local` here - - for await event in system.cluster.events { - try __secretlyKnownToBeLocal.receiveClusterEvent(event) + try await self?.whenLocal { myself in + for await event in system.cluster.events { + try myself.receiveClusterEvent(event) + } } } } @@ -120,16 +120,14 @@ internal distributed actor DowningStrategyShell { case .startTimer(let member, let delay): self.log.trace("Start timer for member: \(member), delay: \(delay)") self.memberTimerTasks[member] = Task { [weak self] in - guard let __secretlyKnownToBeLocal = self else { return } // FIXME(local): we really need `local` here - defer { __secretlyKnownToBeLocal.memberTimerTasks.removeValue(forKey: member) } + try await self?.whenLocal { myself in + defer { myself.memberTimerTasks.removeValue(forKey: member) } - try await Task.sleep(until: .now + delay, clock: .continuous) + try await Task.sleep(until: .now + delay, clock: .continuous) - guard !Task.isCancelled else { - return + guard !Task.isCancelled else { return } + myself.onTimeout(member: member) } - - __secretlyKnownToBeLocal.onTimeout(member: member) } case .cancelTimer(let member): self.log.trace("Cancel timer for member: \(member)") diff --git a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift index 6d64f721d..3263d42c1 100644 --- a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -260,9 +260,10 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, ) // TODO(distributed): remove when we remove paths entirely self.eventsListeningTask = Task { [weak self, system] in - for try await event in system.cluster.events { - guard let __secretlyKnownToBeLocal = self else { return } - __secretlyKnownToBeLocal.onClusterEvent(event: event) + try await self?.whenLocal { __secretlyKnownToBeLocal in + for try await event in system.cluster.events { + __secretlyKnownToBeLocal.onClusterEvent(event: event) + } } } @@ -270,9 +271,10 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist, // periodically gossip to other receptionists with the last seqNr we've seen, // and if it happens to be outdated by then this will cause a push from that node. self.slowACKReplicationTimerTask = Task { [weak self] in - for await _ in AsyncTimerSequence.repeating(every: system.settings.receptionist.ackPullReplicationIntervalSlow, clock: .continuous) { - guard let __secretlyKnownToBeLocal = self else { return } - __secretlyKnownToBeLocal.periodicAckTick() + await self?.whenLocal { myself in + for await _ in AsyncTimerSequence.repeating(every: system.settings.receptionist.ackPullReplicationIntervalSlow, clock: .continuous) { + myself.periodicAckTick() + } } } @@ -293,8 +295,8 @@ extension OpLogDistributedReceptionist: LifecycleWatch { _ guest: Guest, with key: DistributedReception.Key ) async where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { - await self.whenLocal { __secretlyKnownToBeLocal in - await __secretlyKnownToBeLocal._checkIn(guest, with: key) + await self.whenLocal { myself in + await myself._checkIn(guest, with: key) } } @@ -422,8 +424,8 @@ extension OpLogDistributedReceptionist: LifecycleWatch { public nonisolated func lookup(_ key: DistributedReception.Key) async -> Set where Guest: DistributedActor, Guest.ActorSystem == ClusterSystem { - await self.whenLocal { __secretlyKnownToBeLocal in - __secretlyKnownToBeLocal._lookup(key) + await self.whenLocal { myself in + myself._lookup(key) } ?? [] } @@ -483,15 +485,12 @@ extension OpLogDistributedReceptionist { let flushDelay = actorSystem.settings.receptionist.listingFlushDelay self.log.debug("schedule delayed flush") self.flushTimerTasks[timerTaskKey] = Task { [weak self] in - defer { - if let __secretlyKnownToBeLocal = self { - __secretlyKnownToBeLocal.flushTimerTasks.removeValue(forKey: timerTaskKey) - } - } + try await self?.whenLocal { myself in + defer { myself.flushTimerTasks.removeValue(forKey: timerTaskKey) } - try await Task.sleep(until: .now + flushDelay, clock: .continuous) - guard let __secretlyKnownToBeLocal = self else { return } - __secretlyKnownToBeLocal.onDelayedListingFlushTick(key: key) + try await Task.sleep(until: .now + flushDelay, clock: .continuous) + myself.onDelayedListingFlushTick(key: key) + } } } @@ -697,8 +696,9 @@ extension OpLogDistributedReceptionist { Task { [weak self] in do { - guard let __secretlyKnownToBeLocal = self else { return } // FIXME: we need `local` - try await peerReceptionistRef.ackOps(until: latestAppliedSeqNrFromPeer, by: __secretlyKnownToBeLocal) + try await self?.whenLocal { myself in + try await peerReceptionistRef.ackOps(until: latestAppliedSeqNrFromPeer, by: myself) + } } catch { switch error { case let remoteCallError as RemoteCallError where isIgnorable(remoteCallError):