Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Sources/SwiftExtensions/AsyncUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,23 @@ package func withTimeout<T: Sendable>(

let tasks = mutableTasks

defer {
// Be extra careful and ensure that we don't leave `bodyTask` or `timeoutTask` running when `withTimeout` finishes,
// eg. if `withTaskPriorityChangedHandler` adds some behavior that never executes `body` if the task gets cancelled.
for task in tasks {
task.cancel()
}
}

return try await withTaskPriorityChangedHandler(initialPriority: priority) {
for try await value in stream {
return value
}
// The only reason for the loop above to terminate is if the Task got cancelled or if the continuation finishes
// The only reason for the loop above to terminate is if the Task got cancelled or if the stream finishes
// (which it never does).
if Task.isCancelled {
for task in tasks {
task.cancel()
}
// Throwing a `CancellationError` will make us return from `withTimeout`. We will cancel the `bodyTask` from the
// `defer` method above.
throw CancellationError()
} else {
preconditionFailure("Continuation never finishes")
Expand Down
10 changes: 7 additions & 3 deletions Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ package func withTaskPriorityChangedHandler<T: Sendable>(
) async throws -> T {
let lastPriority = ThreadSafeBox(initialValue: initialPriority)
let result: T? = try await withThrowingTaskGroup(of: Optional<T>.self) { taskGroup in
defer {
// We leave this closure when either we have received a result or we registered cancellation. In either case, we
// want to make sure that we don't leave the body task or the priority watching task running.
taskGroup.cancelAll()
}
// Run the task priority watcher with high priority instead of inheriting the initial priority. Otherwise a
// `.background` task might not get its priority elevated because the priority watching task also runs at
// `.background` priority and might not actually get executed in time.
Expand Down Expand Up @@ -54,11 +59,10 @@ package func withTaskPriorityChangedHandler<T: Sendable>(
taskGroup.addTask {
try await operation()
}
// The first task that watches the priority never finishes, so we are effectively await the `operation` task here
// and cancelling the priority observation task once the operation task is done.
// The first task that watches the priority never finishes unless it is cancelled, so we are effectively await the
// `operation` task here.
// We do need to await the observation task as well so that priority escalation also affects the observation task.
for try await case let value? in taskGroup {
taskGroup.cancelAll()
return value
}
return nil
Expand Down