diff --git a/Sources/SwiftExtensions/AsyncUtils.swift b/Sources/SwiftExtensions/AsyncUtils.swift index cff51e673..76bbce12d 100644 --- a/Sources/SwiftExtensions/AsyncUtils.swift +++ b/Sources/SwiftExtensions/AsyncUtils.swift @@ -215,16 +215,23 @@ package func withTimeout( let tasks = mutableTasks + defer { + // Be extra careful and ensure that we don't leave `bodyTask` or `timeoutTask` running when `withTimeout` finishes, + // eg. if `withTaskPriorityChangedHandler` adds some behavior that never executes `body` if the task gets cancelled. + for task in tasks { + task.cancel() + } + } + return try await withTaskPriorityChangedHandler(initialPriority: priority) { for try await value in stream { return value } - // The only reason for the loop above to terminate is if the Task got cancelled or if the continuation finishes + // The only reason for the loop above to terminate is if the Task got cancelled or if the stream finishes // (which it never does). if Task.isCancelled { - for task in tasks { - task.cancel() - } + // Throwing a `CancellationError` will make us return from `withTimeout`. We will cancel the `bodyTask` from the + // `defer` method above. throw CancellationError() } else { preconditionFailure("Continuation never finishes") diff --git a/Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift b/Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift index eb9fc4b3e..97f95e358 100644 --- a/Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift +++ b/Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift @@ -24,6 +24,11 @@ package func withTaskPriorityChangedHandler( ) async throws -> T { let lastPriority = ThreadSafeBox(initialValue: initialPriority) let result: T? = try await withThrowingTaskGroup(of: Optional.self) { taskGroup in + defer { + // We leave this closure when either we have received a result or we registered cancellation. In either case, we + // want to make sure that we don't leave the body task or the priority watching task running. + taskGroup.cancelAll() + } // Run the task priority watcher with high priority instead of inheriting the initial priority. Otherwise a // `.background` task might not get its priority elevated because the priority watching task also runs at // `.background` priority and might not actually get executed in time. @@ -54,11 +59,10 @@ package func withTaskPriorityChangedHandler( taskGroup.addTask { try await operation() } - // The first task that watches the priority never finishes, so we are effectively await the `operation` task here - // and cancelling the priority observation task once the operation task is done. + // The first task that watches the priority never finishes unless it is cancelled, so we are effectively await the + // `operation` task here. // We do need to await the observation task as well so that priority escalation also affects the observation task. for try await case let value? in taskGroup { - taskGroup.cancelAll() return value } return nil