Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
**v0.5.3 - Fluorine:**

This version is a bug fix version to remove warnings in the latest Xcode/Swift.

- Updated renamed functions to use updated name/signatures in source code.
- Use async enabled expectation waiting facilities in test code.

**v0.5.2 - Oxygen:**

This version is a bug fix version.
Expand Down
42 changes: 21 additions & 21 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,7 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)

return await withTaskCancellationHandler { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
} operation: {
return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
Expand Down Expand Up @@ -218,6 +198,26 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
onSuspend?()
}
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
}
}

Expand Down
42 changes: 21 additions & 21 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,7 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)

return try await withTaskCancellationHandler { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
} operation: {
return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
Expand Down Expand Up @@ -245,6 +225,26 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
onSuspend?()
}
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/Combiners/Merge/MergeStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ struct MergeStateMachine<Element>: Sendable {

func next() async -> RegulatedElement<Element> {
await withTaskCancellationHandler {
self.unsuspendAndClearOnCancel()
} operation: {
self.requestNextRegulatedElements()

let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation<RegulatedElement<Element>, Never>) in
Expand Down Expand Up @@ -244,6 +242,8 @@ struct MergeStateMachine<Element>: Sendable {
}

return regulatedElement
} onCancel: {
self.unsuspendAndClearOnCancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,7 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
let shouldReturnNil = self.isTerminated.withCriticalRegion { $0 }
guard !shouldReturnNil else { return nil }

return try await withTaskCancellationHandler { [isTerminated, othersTask] in
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
} operation: { [othersTask, othersState, onBaseElement] in
return try await withTaskCancellationHandler { [othersTask, othersState, onBaseElement] in
do {
while true {
guard let baseElement = try await self.base.next() else {
Expand Down Expand Up @@ -219,6 +214,11 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
othersTask?.cancel()
throw error
}
} onCancel: { [isTerminated, othersTask] in
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ where Other: Sendable, Other.Element: Sendable {
public mutating func next() async rethrows -> Element? {
guard !self.isTerminated else { return nil }

return try await withTaskCancellationHandler { [otherTask] in
otherTask?.cancel()
} operation: { [otherTask, otherState, onBaseElement] in
return try await withTaskCancellationHandler { [otherTask, otherState, onBaseElement] in
do {
while true {
guard let baseElement = try await self.base.next() else {
Expand Down Expand Up @@ -157,6 +155,8 @@ where Other: Sendable, Other.Element: Sendable {
otherTask?.cancel()
throw error
}
} onCancel: { [otherTask] in
otherTask?.cancel()
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/Zip2Runtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:

func next() async rethrows -> (Base1.Element, Base2.Element)? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>)?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -173,6 +167,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
self.handle(demandIsFulfilledOutput: output)

return try (results.0._rethrowGet(), results.1._rethrowGet())
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/Zip3Runtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:

func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element)? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>, Result<Base3.Element, Error>)?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -211,6 +205,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
self.handle(demandIsFulfilledOutput: output)

return try (results.0._rethrowGet(), results.1._rethrowGet(), results.2._rethrowGet())
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/ZipRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ where Base: Sendable, Base.Element: Sendable {

func next() async rethrows -> [Base.Element]? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<[Int: Result<Base.Element, Error>]?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -145,6 +139,12 @@ where Base: Sendable, Base.Element: Sendable {
self.handle(demandIsFulfilledOutput: output)

return try results.sorted { $0.key < $1.key }.map { try $0.value._rethrowGet() }
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/Creators/AsyncTimerSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public struct AsyncTimerSequence: AsyncSequence {
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler { [task] in
task.cancel()
} operation: {
await withTaskCancellationHandler {
guard !Task.isCancelled else { return nil }
return await self.iterator.next()
} onCancel: { [task] in
task.cancel()
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/Operators/AsyncSwitchToLatestSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ where Base.Element: AsyncSequence, Base: Sendable, Base.Element.Element: Sendabl
guard !Task.isCancelled else { return nil }
self.startBase()

return try await withTaskCancellationHandler { [baseTask, state] in
baseTask?.cancel()
state.withCriticalRegion {
$0.childTask?.cancel()
}
} operation: {
return try await withTaskCancellationHandler {
while true {
let childTask = await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Task<ChildValue?, Never>?, Never>) in
let decision = state.withCriticalRegion { state -> NextDecision in
Expand Down Expand Up @@ -303,6 +298,11 @@ where Base.Element: AsyncSequence, Base: Sendable, Base.Element.Element: Sendabl
return try element._rethrowGet()
}
}
} onCancel: { [baseTask, state] in
baseTask?.cancel()
state.withCriticalRegion {
$0.childTask?.cancel()
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Supporting/Regulator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ final class Regulator<Base: AsyncSequence>: @unchecked Sendable {

func iterate() async {
await withTaskCancellationHandler {
self.unsuspendAndExitOnCancel()
} operation: {
var mutableBase = base.makeAsyncIterator()

do {
Expand Down Expand Up @@ -99,6 +97,8 @@ final class Regulator<Base: AsyncSequence>: @unchecked Sendable {
}
self.onNextRegulatedElement(.element(result: .failure(error)))
}
} onCancel: {
self.unsuspendAndExitOnCancel()
}
}

Expand Down
14 changes: 7 additions & 7 deletions Tests/AsyncChannels/AsyncBufferedChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class AsyncBufferedChannelTests: XCTestCase {
return received
}

wait(for: [iterationIsAwaiting], timeout: 1.0)
await fulfillment(of: [iterationIsAwaiting], timeout: 1.0)

// When
sut.send(1)
Expand Down Expand Up @@ -125,19 +125,19 @@ final class AsyncBufferedChannelTests: XCTestCase {
for await element in sut {
received = element
taskCanBeCancelled.fulfill()
wait(for: [taskWasCancelled], timeout: 1.0)
await fulfillment(of: [taskWasCancelled], timeout: 1.0)
}
iterationHasFinished.fulfill()
return received
}

wait(for: [taskCanBeCancelled], timeout: 1.0)
await fulfillment(of: [taskCanBeCancelled], timeout: 1.0)

// When
task.cancel()
taskWasCancelled.fulfill()

wait(for: [iterationHasFinished], timeout: 1.0)
await fulfillment(of: [iterationHasFinished], timeout: 1.0)

// Then
let received = await task.value
Expand Down Expand Up @@ -170,12 +170,12 @@ final class AsyncBufferedChannelTests: XCTestCase {
return received
}

wait(for: [iteration1IsAwaiting, iteration2IsAwaiting], timeout: 1.0)
await fulfillment(of: [iteration1IsAwaiting, iteration2IsAwaiting], timeout: 1.0)

// When
sut.finish()

wait(for: [iteration1IsFinished, iteration2IsFinished], timeout: 1.0)
await fulfillment(of: [iteration1IsFinished, iteration2IsFinished], timeout: 1.0)

let received1 = await task1.value
let received2 = await task2.value
Expand Down Expand Up @@ -217,7 +217,7 @@ final class AsyncBufferedChannelTests: XCTestCase {
}.cancel()

// Then
wait(for: [iterationIsFinished], timeout: 1.0)
await fulfillment(of: [iterationIsFinished], timeout: 1.0)
}

func test_awaiting_uses_id_for_equatable() {
Expand Down
Loading