diff --git a/Sources/Helpers/AsyncValueSubject.swift b/Sources/Helpers/AsyncValueSubject.swift index 7c886cd53..749a4fe08 100644 --- a/Sources/Helpers/AsyncValueSubject.swift +++ b/Sources/Helpers/AsyncValueSubject.swift @@ -20,9 +20,10 @@ package final class AsyncValueSubject: Sendable { var value: Value var continuations: [UInt: AsyncStream.Continuation] = [:] var count: UInt = 0 + var finished = false } - let bufferingPolicy: BufferingPolicy + let bufferingPolicy: UncheckedSendable let mutableState: LockIsolated /// Creates a new AsyncValueSubject with an initial value. @@ -31,7 +32,7 @@ package final class AsyncValueSubject: Sendable { /// - bufferingPolicy: Determines how values are buffered in the AsyncStream (defaults to .unbounded) package init(_ initialValue: Value, bufferingPolicy: BufferingPolicy = .unbounded) { self.mutableState = LockIsolated(MutableState(value: initialValue)) - self.bufferingPolicy = bufferingPolicy + self.bufferingPolicy = UncheckedSendable(bufferingPolicy) } deinit { @@ -43,12 +44,17 @@ package final class AsyncValueSubject: Sendable { mutableState.value } - /// Sends a new value to the subject and notifies all observers. - /// - Parameter value: The new value to send + /// Resume the task awaiting the next iteration point by having it return normally from its suspension point with a given element. + /// - Parameter value: The value to yield from the continuation. + /// + /// If nothing is awaiting the next value, this method attempts to buffer the result’s element. + /// + /// This can be called more than once and returns to the caller immediately without blocking for any awaiting consumption from the iteration. package func yield(_ value: Value) { mutableState.withValue { - $0.value = value + guard !$0.finished else { return } + $0.value = value for (_, continuation) in $0.continuations { continuation.yield(value) } @@ -62,14 +68,20 @@ package final class AsyncValueSubject: Sendable { /// finish, the stream enters a terminal state and doesn't produce any /// additional elements. package func finish() { - for (_, continuation) in mutableState.continuations { - continuation.finish() + mutableState.withValue { + guard $0.finished == false else { return } + + $0.finished = true + + for (_, continuation) in $0.continuations { + continuation.finish() + } } } /// An AsyncStream that emits the current value and all subsequent updates. package var values: AsyncStream { - AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in + AsyncStream(bufferingPolicy: bufferingPolicy.value) { continuation in insert(continuation) } } diff --git a/Tests/HelpersTests/AsyncValueSubjectTests.swift b/Tests/HelpersTests/AsyncValueSubjectTests.swift new file mode 100644 index 000000000..988dbe870 --- /dev/null +++ b/Tests/HelpersTests/AsyncValueSubjectTests.swift @@ -0,0 +1,103 @@ +// +// AsyncValueSubjectTests.swift +// Supabase +// +// Created by Guilherme Souza on 24/03/25. +// + +import ConcurrencyExtras +import XCTest + +@testable import Helpers + +final class AsyncValueSubjectTests: XCTestCase { + + override func invokeTest() { + withMainSerialExecutor { + super.invokeTest() + } + } + + func testInitialValue() async { + let subject = AsyncValueSubject(42) + XCTAssertEqual(subject.value, 42) + } + + func testYieldUpdatesValue() async { + let subject = AsyncValueSubject(0) + subject.yield(10) + XCTAssertEqual(subject.value, 10) + } + + func testValuesStream() async { + let subject = AsyncValueSubject(0) + let values = LockIsolated<[Int]>([]) + + let task = Task { + for await value in subject.values { + let values = values.withValue { + $0.append(value) + return $0 + } + if values.count == 4 { + break + } + } + } + + await Task.yield() + + subject.yield(1) + subject.yield(2) + subject.yield(3) + subject.finish() + + await task.value + + XCTAssertEqual(values.value, [0, 1, 2, 3]) + } + + func testOnChangeHandler() async { + let subject = AsyncValueSubject(0) + let values = LockIsolated<[Int]>([]) + + let task = subject.onChange { value in + values.withValue { + $0.append(value) + } + } + + await Task.yield() + + subject.yield(1) + subject.yield(2) + subject.yield(3) + subject.finish() + + await task.value + + XCTAssertEqual(values.value, [0, 1, 2, 3]) + } + + func testFinish() async { + let subject = AsyncValueSubject(0) + let values = LockIsolated<[Int]>([]) + + let task = Task { + for await value in subject.values { + values.withValue { $0.append(value) } + } + } + + await Task.yield() + + subject.yield(1) + subject.finish() + subject.yield(2) + + await task.value + + XCTAssertEqual(values.value, [0, 1]) + XCTAssertEqual(subject.value, 1) + } +}