-
Notifications
You must be signed in to change notification settings - Fork 188
Add MultiProducerSingleConsumerChannel
#305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
phausler
merged 22 commits into
apple:main
from
FranzBusch:fb-async-backpressured-stream
Nov 18, 2025
+4,718
−0
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
76092b0
Add `AsyncBackpressuredStream` proposal and implementation
FranzBusch 77c25a8
Update proposal and implementation
FranzBusch 77d4f3e
Update proposal
FranzBusch 1713ff8
Add example project
FranzBusch 36f9e73
Formatting
FranzBusch a2189fa
Fix Swift 6.0 build
FranzBusch 3ede252
Future direction for ~Copyable elements
FranzBusch 1d87ee8
Apply formatting
FranzBusch c1935e3
Fix CI
FranzBusch 05a2184
Move to 6.1 and update proposal
FranzBusch 48caa9d
Guard tests
FranzBusch e452347
Minor edits to the proposal
FranzBusch c3114e5
Fix revision order
FranzBusch 42495bb
FIxup setOnTerminationCallback
FranzBusch 73b103f
Address review feedback
FranzBusch b7a7342
Rename to `MultiProducerSingleConsumerAsyncChannel`
FranzBusch e6ffb55
Allow one termination callback per source.
FranzBusch 4b5b39d
Fix all sendable warnings
FranzBusch 2042214
Remove unbounded strategy, rename copy -> makeAdditionalSource, renam…
FranzBusch 657cf6e
Remove Example and fix docs and fix format
FranzBusch f17b9e9
fixes grammar and some word order issues
heckj 346845e
Remove unnecessary consume
FranzBusch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| root = true | ||
|
|
||
| [*] | ||
| indent_style = space | ||
| indent_size = 2 | ||
| end_of_line = lf | ||
| insert_final_newline = true | ||
| trim_trailing_whitespace = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
838 changes: 838 additions & 0 deletions
838
Evolution/0016-mutli-producer-single-consumer-channel.md
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,329 @@ | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the Swift Async Algorithms open source project | ||
| // | ||
| // Copyright (c) 2023 Apple Inc. and the Swift project authors | ||
| // Licensed under Apache License v2.0 with Runtime Library Exception | ||
| // | ||
| // See https://swift.org/LICENSE.txt for license information | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
| //===----------------------------------------------------------------------===// | ||
| // | ||
| // This source file is part of the SwiftCertificates open source project | ||
| // | ||
| // Copyright (c) 2023 Apple Inc. and the SwiftCertificates project authors | ||
| // Licensed under Apache License v2.0 | ||
| // | ||
| // See LICENSE.txt for license information | ||
| // See CONTRIBUTORS.txt for the list of SwiftCertificates project authors | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // | ||
| //===----------------------------------------------------------------------===// | ||
|
|
||
| /// ``_TinyArray`` is a ``RandomAccessCollection`` optimised to store zero or one ``Element``. | ||
| /// It supports arbitrary many elements but if only up to one ``Element`` is stored it does **not** allocate separate storage on the heap | ||
| /// and instead stores the ``Element`` inline. | ||
| @usableFromInline | ||
| struct _TinyArray<Element> { | ||
| @usableFromInline | ||
| enum Storage { | ||
| case one(Element) | ||
| case arbitrary([Element]) | ||
| } | ||
|
|
||
| @usableFromInline | ||
| var storage: Storage | ||
| } | ||
|
|
||
| // MARK: - TinyArray "public" interface | ||
|
|
||
| extension _TinyArray: Equatable where Element: Equatable {} | ||
| extension _TinyArray: Hashable where Element: Hashable {} | ||
| extension _TinyArray: Sendable where Element: Sendable {} | ||
|
|
||
| extension _TinyArray: RandomAccessCollection { | ||
| @usableFromInline | ||
| typealias Element = Element | ||
|
|
||
| @usableFromInline | ||
| typealias Index = Int | ||
|
|
||
| @inlinable | ||
| subscript(position: Int) -> Element { | ||
| get { | ||
| self.storage[position] | ||
| } | ||
| set { | ||
| self.storage[position] = newValue | ||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| var startIndex: Int { | ||
| self.storage.startIndex | ||
| } | ||
|
|
||
| @inlinable | ||
| var endIndex: Int { | ||
| self.storage.endIndex | ||
| } | ||
| } | ||
|
|
||
| extension _TinyArray { | ||
| @inlinable | ||
| init(_ elements: some Sequence<Element>) { | ||
| self.storage = .init(elements) | ||
| } | ||
|
|
||
| @inlinable | ||
| init() { | ||
| self.storage = .init() | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(_ newElement: Element) { | ||
| self.storage.append(newElement) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(contentsOf newElements: some Sequence<Element>) { | ||
| self.storage.append(contentsOf: newElements) | ||
| } | ||
|
|
||
| @discardableResult | ||
| @inlinable | ||
| mutating func remove(at index: Int) -> Element { | ||
| self.storage.remove(at: index) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { | ||
| try self.storage.removeAll(where: shouldBeRemoved) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { | ||
| try self.storage.sort(by: areInIncreasingOrder) | ||
| } | ||
| } | ||
|
|
||
| // MARK: - TinyArray.Storage "private" implementation | ||
|
|
||
| extension _TinyArray.Storage: Equatable where Element: Equatable { | ||
| @inlinable | ||
| static func == (lhs: Self, rhs: Self) -> Bool { | ||
| switch (lhs, rhs) { | ||
| case (.one(let lhs), .one(let rhs)): | ||
| return lhs == rhs | ||
| case (.arbitrary(let lhs), .arbitrary(let rhs)): | ||
| // we don't use lhs.elementsEqual(rhs) so we can hit the fast path from Array | ||
| // if both arrays share the same underlying storage: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1775 | ||
| return lhs == rhs | ||
|
|
||
| case (.one(let element), .arbitrary(let array)), | ||
| (.arbitrary(let array), .one(let element)): | ||
| guard array.count == 1 else { | ||
| return false | ||
| } | ||
| return element == array[0] | ||
|
|
||
| } | ||
| } | ||
| } | ||
| extension _TinyArray.Storage: Hashable where Element: Hashable { | ||
| @inlinable | ||
| func hash(into hasher: inout Hasher) { | ||
| // same strategy as Array: https://github.com/apple/swift/blob/b42019005988b2d13398025883e285a81d323efa/stdlib/public/core/Array.swift#L1801 | ||
| hasher.combine(count) | ||
| for element in self { | ||
| hasher.combine(element) | ||
| } | ||
| } | ||
| } | ||
| extension _TinyArray.Storage: Sendable where Element: Sendable {} | ||
|
|
||
| extension _TinyArray.Storage: RandomAccessCollection { | ||
| @inlinable | ||
| subscript(position: Int) -> Element { | ||
| get { | ||
| switch self { | ||
| case .one(let element): | ||
| guard position == 0 else { | ||
| fatalError("index \(position) out of bounds") | ||
| } | ||
| return element | ||
| case .arbitrary(let elements): | ||
| return elements[position] | ||
| } | ||
| } | ||
| set { | ||
| switch self { | ||
| case .one: | ||
| guard position == 0 else { | ||
| fatalError("index \(position) out of bounds") | ||
| } | ||
| self = .one(newValue) | ||
| case .arbitrary(var elements): | ||
| elements[position] = newValue | ||
| self = .arbitrary(elements) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| var startIndex: Int { | ||
| 0 | ||
| } | ||
|
|
||
| @inlinable | ||
| var endIndex: Int { | ||
| switch self { | ||
| case .one: return 1 | ||
| case .arbitrary(let elements): return elements.endIndex | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension _TinyArray.Storage { | ||
| @inlinable | ||
| init(_ elements: some Sequence<Element>) { | ||
| var iterator = elements.makeIterator() | ||
| guard let firstElement = iterator.next() else { | ||
| self = .arbitrary([]) | ||
| return | ||
| } | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements just contains a single element | ||
| // and we hit the fast path | ||
| self = .one(firstElement) | ||
| return | ||
| } | ||
|
|
||
| var elements: [Element] = [] | ||
| elements.reserveCapacity(elements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| while let nextElement = iterator.next() { | ||
| elements.append(nextElement) | ||
| } | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| @inlinable | ||
| init() { | ||
| self = .arbitrary([]) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(_ newElement: Element) { | ||
| self.append(contentsOf: CollectionOfOne(newElement)) | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func append(contentsOf newElements: some Sequence<Element>) { | ||
| switch self { | ||
| case .one(let firstElement): | ||
| var iterator = newElements.makeIterator() | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements is empty, nothing to do | ||
| return | ||
| } | ||
| var elements: [Element] = [] | ||
| elements.reserveCapacity(1 + newElements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| elements.appendRemainingElements(from: &iterator) | ||
| self = .arbitrary(elements) | ||
|
|
||
| case .arbitrary(var elements): | ||
| if elements.isEmpty { | ||
| // if `self` is currently empty and `newElements` just contains a single | ||
| // element, we skip allocating an array and set `self` to `.one(firstElement)` | ||
| var iterator = newElements.makeIterator() | ||
| guard let firstElement = iterator.next() else { | ||
| // newElements is empty, nothing to do | ||
| return | ||
| } | ||
| guard let secondElement = iterator.next() else { | ||
| // newElements just contains a single element | ||
| // and we hit the fast path | ||
| self = .one(firstElement) | ||
| return | ||
| } | ||
| elements.reserveCapacity(elements.count + newElements.underestimatedCount) | ||
| elements.append(firstElement) | ||
| elements.append(secondElement) | ||
| elements.appendRemainingElements(from: &iterator) | ||
| self = .arbitrary(elements) | ||
|
|
||
| } else { | ||
| elements.append(contentsOf: newElements) | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @discardableResult | ||
| @inlinable | ||
| mutating func remove(at index: Int) -> Element { | ||
| switch self { | ||
| case .one(let oldElement): | ||
| guard index == 0 else { | ||
| fatalError("index \(index) out of bounds") | ||
| } | ||
| self = .arbitrary([]) | ||
| return oldElement | ||
|
|
||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
| return elements.remove(at: index) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func removeAll(where shouldBeRemoved: (Element) throws -> Bool) rethrows { | ||
| switch self { | ||
| case .one(let oldElement): | ||
| if try shouldBeRemoved(oldElement) { | ||
| self = .arbitrary([]) | ||
| } | ||
|
|
||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
| return try elements.removeAll(where: shouldBeRemoved) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| @inlinable | ||
| mutating func sort(by areInIncreasingOrder: (Element, Element) throws -> Bool) rethrows { | ||
| switch self { | ||
| case .one: | ||
| // a collection of just one element is always sorted, nothing to do | ||
| break | ||
| case .arbitrary(var elements): | ||
| defer { | ||
| self = .arbitrary(elements) | ||
| } | ||
|
|
||
| try elements.sort(by: areInIncreasingOrder) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| extension Array { | ||
| @inlinable | ||
| mutating func appendRemainingElements(from iterator: inout some IteratorProtocol<Element>) { | ||
| while let nextElement = iterator.next() { | ||
| append(nextElement) | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could perhaps be subsumed by
InlineArrayinstead since the original intent was to have a faster storage (this is not a blocking concept)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InlineArrayisn't capable of fully replacing the need here. The idea behindTinyArrayis to have a fast path for a single element that doesn't allocate and one that allocates if there are more. Since we can't tell at compile time how many producers we have we need this runtime dynamism.