From 66b18922c4aa6c431980c873db4bc6b7653dd8b3 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 25 Oct 2024 17:27:02 +0900 Subject: [PATCH] PoC of DistributedSequencem that is DistributedActorSystem agnostic! --- .../DistributedSequence.swift | 151 ++++++++++++++++++ .../DistributedSequenceTests.swift | 78 +++++++++ 2 files changed, 229 insertions(+) create mode 100644 Sources/DistributedCluster/DistributedSequence.swift create mode 100644 Tests/DistributedClusterTests/DistributedSequenceTests.swift diff --git a/Sources/DistributedCluster/DistributedSequence.swift b/Sources/DistributedCluster/DistributedSequence.swift new file mode 100644 index 000000000..da2cb6b58 --- /dev/null +++ b/Sources/DistributedCluster/DistributedSequence.swift @@ -0,0 +1,151 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_exported import Distributed +import struct Foundation.UUID + +/// A distributed sequence is a way to communicate the element pulling semantics of a sequence/iterator over the network. +/// The actual data source is located on the remote side of this distributed sequence (which is a distributed actor), and dictates the semantics of the sequence. +/// +/// Subscription semantics may differ from stream implementation to stream implementation, e.g. some may allow subscribing multiple times, +/// while others may allow only a single-pass and single "once" consumer. +/// +/// Usually implementations will also have some form of timeout, after some amount of innactivity the sequence may tear itself down in order to conserve +/// resources. Please refer to the documentation of the `distributed method` returning (or accepting) a distributed sequence to learn about its expected semantics. +/// +/// - SeeAlso: +/// - ``Swift/Sequence/distributed(using:)`` +/// - ``Swift/AsyncSequence/distributed(using:)`` +@available(macOS 15, iOS 18, tvOS 18, watchOS 11, *) +public protocol DistributedSequence: DistributedActor, Codable, AsyncSequence + where Element: Sendable & Codable, ActorSystem: DistributedActorSystem { } + +// TODO: make Failure generic as well + +extension Sequence where Element: Sendable & Codable { + + /// Produce a ``DistributedSequence`` of this ``Swift/Sequence`` which may be passed to `distributed` methods. + @available(macOS 15, iOS 18, tvOS 18, watchOS 11, *) + public func distributed(using actorSystem: ActorSystem) -> some DistributedSequence + where ActorSystem: DistributedActorSystem, ActorSystem.ActorID: Sendable & Codable { + DistributedSequenceImpl(self, actorSystem: actorSystem) + } +} + +// TODO: Implement also for throwing AsyncSequence +@available(macOS 15, iOS 18, tvOS 18, watchOS 11, *) +extension AsyncSequence where Element: Sendable & Codable, Failure == Never { + + @available(macOS 15, iOS 18, tvOS 18, watchOS 11, *) + public func distributed(using actorSystem: ActorSystem) -> some DistributedSequence + where ActorSystem: DistributedActorSystem, ActorSystem.ActorID: Sendable & Codable { + DistributedSequenceImpl(self, actorSystem: actorSystem) + } +} + +@available(macOS 15, iOS 18, tvOS 18, watchOS 11, *) +public distributed actor DistributedSequenceImpl : DistributedSequence + where Element: Sendable & Codable, + AS: DistributedActorSystem, + AS.ActorID: Codable { + public typealias ActorSystem = AS + public typealias Failure = any Error + + + let elements: any AsyncSequence // non throwing sequence for now; need to support either + + /// Active iterators for given subscriber UUID + // TODO: these should be reaped on a timeout, if a value is not consumed for a long time + var consumers: [UUID: any AsyncIteratorProtocol] + + /// Initialize this type using the helper .distributed() function on ``Sequence`` + internal init( + _ elements: Seq, + actorSystem: AS + ) where Seq.Element == Element { + self.init(elements.async, actorSystem: actorSystem) + } + + /// Initialize this type using the helper .distributed() function on ``Sequence`` + internal init( + _ elements: Seq, + actorSystem: ActorSystem + ) where Seq.Element == Element, Seq.Failure == Never { + self.elements = elements + self.consumers = [:] + self.actorSystem = actorSystem + } + + distributed func getNext(_ subscriber: UUID) async throws -> Element? { + print("getNext(\(subscriber)") + // However you want to implement getting "next" elements from the underlying stream, + // if we had multiple subscribers, does each get an element round-robin, everyone gets their own subscription, or do we reject multiple consumers etc. + if var iter = consumers[subscriber] { + let element = try? await iter.next() // FIXME: propagate the error anc cancel the sub if throws + if element == nil { + // end of stream, no need to keep the + consumers[subscriber] = nil + } + consumers[subscriber] = iter + + return element + } else { + var iter = self.elements.makeAsyncIterator() + consumers[subscriber] = iter + + let element = try? await iter.next() // FIXME: propagate the error anc cancel the sub if throws + if element == nil { + // end of stream, no need to keep the + consumers[subscriber] = nil + } + consumers[subscriber] = iter + + return element + } + } + + distributed func cancel(_ subscriber: UUID) async throws { + print("cancel(\(subscriber)") + self.consumers[subscriber] = nil + } + + public nonisolated func makeAsyncIterator() -> AsyncIterator { + print("make async iterator (\(self)") + return .init(ref: self) + } + + final public class AsyncIterator: AsyncIteratorProtocol { + let ref: DistributedSequenceImpl + let uuid: UUID + + init(ref: DistributedSequenceImpl) { + self.ref = ref + self.uuid = UUID() + } + + public func next() async throws -> Element? { + print("Iterator/next") + return try await ref.getNext(self.uuid) + } + + deinit { + Task.detached { [ref, uuid] in + try await ref.cancel(uuid) + } + } + + } + + +} diff --git a/Tests/DistributedClusterTests/DistributedSequenceTests.swift b/Tests/DistributedClusterTests/DistributedSequenceTests.swift new file mode 100644 index 000000000..13ce88a22 --- /dev/null +++ b/Tests/DistributedClusterTests/DistributedSequenceTests.swift @@ -0,0 +1,78 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsTestKit +@testable import DistributedCluster +import Foundation +import AsyncAlgorithms +import XCTest + +@available(macOS 15.0, *) +final class DistributedSequenceTests: SingleClusterSystemXCTestCase { + + func test_distributed_sequence() async throws { + let (first, second) = await self.setUpPair { settings in + settings.enabled = true + } + + let secondProbe = self.testKit(second).makeTestProbe(expecting: Int.self) + + // Create an "emitter" on the `first` node + // Make it return a sequence of a few numbers... + let expectedSequence = [1, 2, 3] + let emitter = Emitter(seq: expectedSequence, actorSystem: first) + + // The consumer is on the `second` node... + let consumer: Consumer = Consumer(actorSystem: second) + + // Consume the stream in the second + let stream = try await emitter.getStream() + let all = try await consumer.consume(stream, probe: secondProbe) + + // Verify we got all the messages in the response as well as the probe received them + all.shouldEqual(expectedSequence) + _ = try secondProbe.expectMessages(count: expectedSequence.count) + } + +} + +@available(macOS 15.0, *) +distributed actor Emitter where Element: Sendable & Codable { + typealias ActorSystem = ClusterSystem + let seq: any Sequence + + init(seq: any Sequence, actorSystem: ActorSystem) { + self.seq = seq + self.actorSystem = actorSystem + } + + distributed func getStream() -> some DistributedSequence { + DistributedSequenceImpl(self.seq, actorSystem: self.actorSystem) + } +} + +@available(macOS 15.0, *) +distributed actor Consumer where ActorSystem: DistributedActorSystem, Element: Sendable & Codable { + distributed func consume(_ seq: some DistributedSequence, probe: ActorTestProbe) async throws -> [Element] { + var all: [Element] = [] + for try await element in seq { + all.append(element) + probe.tell(element) + } + + return all + } +} +