Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions Sources/Helpers/AsyncValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
var value: Value
var continuations: [UInt: AsyncStream<Value>.Continuation] = [:]
var count: UInt = 0
var finished = false
}

let bufferingPolicy: BufferingPolicy
let bufferingPolicy: UncheckedSendable<BufferingPolicy>
let mutableState: LockIsolated<MutableState>

/// Creates a new AsyncValueSubject with an initial value.
Expand All @@ -31,7 +32,7 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
/// - bufferingPolicy: Determines how values are buffered in the AsyncStream (defaults to .unbounded)
package init(_ initialValue: Value, bufferingPolicy: BufferingPolicy = .unbounded) {
self.mutableState = LockIsolated(MutableState(value: initialValue))
self.bufferingPolicy = bufferingPolicy
self.bufferingPolicy = UncheckedSendable(bufferingPolicy)
}

deinit {
Expand All @@ -43,12 +44,17 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
mutableState.value
}

/// Sends a new value to the subject and notifies all observers.
/// - Parameter value: The new value to send
/// Resume the task awaiting the next iteration point by having it return normally from its suspension point with a given element.
/// - Parameter value: The value to yield from the continuation.
///
/// If nothing is awaiting the next value, this method attempts to buffer the result’s element.
///
/// This can be called more than once and returns to the caller immediately without blocking for any awaiting consumption from the iteration.
package func yield(_ value: Value) {
mutableState.withValue {
$0.value = value
guard !$0.finished else { return }

$0.value = value
for (_, continuation) in $0.continuations {
continuation.yield(value)
}
Expand All @@ -62,14 +68,20 @@ package final class AsyncValueSubject<Value: Sendable>: Sendable {
/// finish, the stream enters a terminal state and doesn't produce any
/// additional elements.
package func finish() {
for (_, continuation) in mutableState.continuations {
continuation.finish()
mutableState.withValue {
guard $0.finished == false else { return }

$0.finished = true

for (_, continuation) in $0.continuations {
continuation.finish()
}
}
}

/// An AsyncStream that emits the current value and all subsequent updates.
package var values: AsyncStream<Value> {
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
AsyncStream(bufferingPolicy: bufferingPolicy.value) { continuation in
insert(continuation)
}
}
Expand Down
103 changes: 103 additions & 0 deletions Tests/HelpersTests/AsyncValueSubjectTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//
// AsyncValueSubjectTests.swift
// Supabase
//
// Created by Guilherme Souza on 24/03/25.
//

import ConcurrencyExtras
import XCTest

@testable import Helpers

final class AsyncValueSubjectTests: XCTestCase {

override func invokeTest() {
withMainSerialExecutor {
super.invokeTest()
}
}

func testInitialValue() async {
let subject = AsyncValueSubject<Int>(42)
XCTAssertEqual(subject.value, 42)
}

func testYieldUpdatesValue() async {
let subject = AsyncValueSubject<Int>(0)
subject.yield(10)
XCTAssertEqual(subject.value, 10)
}

func testValuesStream() async {
let subject = AsyncValueSubject<Int>(0)
let values = LockIsolated<[Int]>([])

let task = Task {
for await value in subject.values {
let values = values.withValue {
$0.append(value)
return $0
}
if values.count == 4 {
break
}
}
}

await Task.yield()

subject.yield(1)
subject.yield(2)
subject.yield(3)
subject.finish()

await task.value

XCTAssertEqual(values.value, [0, 1, 2, 3])
}

func testOnChangeHandler() async {
let subject = AsyncValueSubject<Int>(0)
let values = LockIsolated<[Int]>([])

let task = subject.onChange { value in
values.withValue {
$0.append(value)
}
}

await Task.yield()

subject.yield(1)
subject.yield(2)
subject.yield(3)
subject.finish()

await task.value

XCTAssertEqual(values.value, [0, 1, 2, 3])
}

func testFinish() async {
let subject = AsyncValueSubject<Int>(0)
let values = LockIsolated<[Int]>([])

let task = Task {
for await value in subject.values {
values.withValue { $0.append(value) }
}
}

await Task.yield()

subject.yield(1)
subject.finish()
subject.yield(2)

await task.value

XCTAssertEqual(values.value, [0, 1])
XCTAssertEqual(subject.value, 1)
}
}
Loading