Skip to content

Commit 70c36ce

Browse files
phauslerjamieQ
andauthored
Share algorithm (#357)
This is an implementation of share; an algorithm for allowing AsyncSequence types to be iterated by multiple consumers. In addition this cleans up the project to be a starting point for the 1.1 release - it fixes the continuous integration support and configurations but has the cost of only supporting 3 active releases (6.0, 6.1, and 6.2). Previous releases are only going to have legacy support via older released versions. * Fix the remaining todo on hard cancellation vs soft cancellation * Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Jamie <[email protected]> * Update the implementation of share to handle sendability requirements, address some edge cases and update to the latest version of the discussion around buffering behaviors as well as adding some documentation and commentary * Add some first drafts at unit tests for verifying share behaviors * Cleanup some of the documentation and address feedback for implementation details * Fix merge damage by restoring the changes from @jamieQ * Fix up some availability issues w.r.t. sendable metatypes * Add a preamble to SendableMetatypes.swift * Add a fallback for next (for pre 6.2 swift's) * Slight adjustments for older compiler builds and expose the sendable metatypes since those are part of documentation * Update formatting and gate availability for older compilers * Speculative fix for pre 6.1 compiler crashes by removing the sending keyword * Revert "Speculative fix for pre 6.1 compiler crashes by removing the sending keyword" This reverts commit 875fa81. * Speculative fix for pre 6.1 compiler crashes by removing the sending keyword, and correct sendable metatype shim availability * Remove block comment for formatting check * Take a more conservative approach and roll back to only supporting share on 6.2 * Only test share on 6.2 or newer * roll back to older sendable metatype constraints * yet another stab at 5.10 visionOS guards * Formatting pass * Disable 5.10 builds and leave those to older releases * Ignore swift-version files to avoid local swiftly toolchain selection from being required by others * Renumber the proposals and copy share into a guide * Add a link from the primary doc interface to share --------- Co-authored-by: Jamie <[email protected]>
1 parent ca38935 commit 70c36ce

File tree

14 files changed

+1734
-10
lines changed

14 files changed

+1734
-10
lines changed

.github/workflows/pull_request.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
name: Test
1010
uses: swiftlang/github-workflows/.github/workflows/swift_package_test.yml@main
1111
with:
12-
linux_exclude_swift_versions: "[{\"swift_version\": \"5.9\"}]"
12+
linux_exclude_swift_versions: "[{\"swift_version\": \"5.9\"}, {\"swift_version\": \"5.10\"}]]"
1313
windows_exclude_swift_versions: "[{\"swift_version\": \"5.9\"}]"
1414
enable_wasm_sdk_build: true
1515
wasm_sdk_build_command: swift build -Xcc -D_WASI_EMULATED_PTHREAD

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ DerivedData/
88
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
99
.netrc
1010
.swiftpm
11-
Package.resolved
11+
Package.resolved
12+
.swift-version
File renamed without changes.

Evolution/NNNN-chunk.md renamed to Evolution/0013-chunk.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Chunked & Timer
2-
* Proposal: [SAA-NNNN](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/NNNN-chunk.md)
2+
* Proposal: [SAA-0013](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0013-chunk.md)
33
* Author(s): [Kevin Perry](https://github.com/kperryua), [Philippe Hausler](https://github.com/phausler)
4-
* Status: **Implemented**
4+
* Status: **Accepted**
55
* Implementation: [
66
[By group](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift),
77
[On projection](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift),

Evolution/NNNN-rate-limits.md renamed to Evolution/0014-rate-limits.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Rate Limiting
22

3-
* Proposal: [SAA-NNNN]()
3+
* Proposal: [SAA-0014](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0014-rate-limits.md)
44
* Authors: [Philippe Hausler](https://github.com/phausler)
5-
* Status: **Implemented**
5+
* Status: **Accepted**
66
* Implementation:
77
[
88
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift) |

Evolution/NNNN-reductions.md renamed to Evolution/0015-reductions.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Reductions
2-
* Proposal: [SAA-NNNN](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/NNNN-reductions.md)
2+
* Proposal: [SAA-0015](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0015-reductions.md)
33
* Author(s): [Philippe Hausler](https://github.com/phausler)
4-
* Status: **Implemented**
4+
* Status: **Accepted**
55
* Implementation: [
66
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncExclusiveReductionsSequence.swift) |
77
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestReductions.swift)

Evolution/0016-share.md

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Share
2+
* Proposal: [SAA-0015](https://github.com/apple/swift-async-algorithms/blob/main/Evolution/0016-share.md)
3+
* Author(s): [Philippe Hausler](https://github.com/phausler)
4+
* Status: **Accepted**
5+
* Implementation: [
6+
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncShareSequence.swift) |
7+
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestShare.swift)
8+
]
9+
* Decision Notes:
10+
* Bugs:
11+
12+
13+
## Introduction
14+
15+
Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time.
16+
17+
## Motivation
18+
19+
There are many potential usages for the sharing concept of AsyncSequences.
20+
21+
One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values.
22+
23+
There are many other use cases that have been requested for this family of algorithms. Since the release of AsyncAlgorithms it has perhaps been the most popularly requested set of behaviors as additions to the package.
24+
25+
## Proposed solution
26+
27+
AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations.
28+
29+
When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements.
30+
31+
It is critical to identify that this is one algorithm in the family of algorithms for sharing values. It should not attempt to solve all behavioral requirements but instead serve a common set of them that make cohesive sense together. This proposal is not mutually exclusive to the other algorithms in the sharing family.
32+
33+
## Detailed design
34+
35+
A new extension will be added to return a `Sendable` `AsyncSequence`. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate.
36+
37+
The `Sendable` annotation identifies to the developer that this sequence can be shared and stored in an existental `any`.
38+
39+
```swift
40+
extension AsyncSequence where Element: Sendable {
41+
public func share(
42+
bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)
43+
) -> some AsyncSequence<Element, Failure> & Sendable
44+
}
45+
```
46+
47+
The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. The signature reuses the existing `AsyncBufferSequencePolicy` type to specify the behavior around buffering either responding to how it should limit emitting to the buffer or what should happen when the buffer is exceeded.
48+
49+
## Runtime Behavior
50+
51+
The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the beahviors there are a terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is refered to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the `AsyncBufferSequencePolicy.bounded(_ : Int)` policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself.
52+
53+
When the underlying type backing the share algorithm is constructed a new extent is created; this is used for tracking the reference lifetime under the hood and is used to both house the iteration but also to identify the point at which no more sides can be constructed. When no more sides can be constructed and no sides are left to iterate then the backing iteration is canceled. This prevents any un-referenced task backing the iteration to not be leaked by the algorith itself.
54+
55+
That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base `AsyncSequence`; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer.
56+
57+
The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the common behavior is often unbounded. Alternatively desktop or mobile applications will often want `.bounded(1)` since that enforces the slowest consumption to drive the forward progress at most 1 buffered element. All of the use cases have a reasonable default of `.bounded(1)`; mobile, deskop, and server side uses. Leaving this as the default parameter keeps the progressive disclosure of the beahviors - such that the easiest thing to write is correct for all uses, and then more advanced control can be adjusted by passing in a specific policy. This default argument diverges slightly from AsyncStream, but follows a similar behavior to that of Combine's `share`.
58+
59+
As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer).
60+
61+
Representing concurrent access is difficult to express all potential examples but there are a few cases included with this proposal to illustrate some of the behaviors. If a more comprehensive behavioral analysis is needed, it is strongly suggested to try out the pending pull request to identify how specific behaviors work. Please keep in mind that the odering between tasks is not specified, only the order within one side of iteration.
62+
63+
Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit).
64+
65+
```swift
66+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .unbounded)
67+
68+
let t1 = Task {
69+
for await element in exampleSource {
70+
if element == 0 {
71+
try? await Task.sleep(for: .seconds(1))
72+
}
73+
print("Task 1", element)
74+
}
75+
}
76+
77+
let t2 = Task {
78+
for await element in exampleSource {
79+
if element == 3 {
80+
try? await Task.sleep(for: .seconds(1))
81+
}
82+
print("Task 2", element)
83+
}
84+
}
85+
86+
await t1.value
87+
await t2.value
88+
89+
```
90+
91+
This example will print a possible ordering of the following:
92+
93+
```
94+
Task 2 0
95+
Task 2 1
96+
Task 2 2
97+
Task 1 0
98+
Task 2 3
99+
Task 2 4
100+
Task 1 1
101+
Task 1 2
102+
Task 1 3
103+
Task 1 4
104+
```
105+
106+
The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output.
107+
108+
If the creation were instead altered to the following:
109+
110+
```swift
111+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingLatest(2))
112+
```
113+
114+
The output would print the possible ordering of:
115+
116+
```
117+
Task 2 0
118+
Task 2 1
119+
Task 2 2
120+
Task 1 0
121+
Task 2 4
122+
Task 1 3
123+
Task 1 4
124+
```
125+
126+
Some values are dropped due to the buffering policy, but eventually they reach consistency. Which similarly works for the following:
127+
128+
```
129+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingOldest(2))
130+
```
131+
132+
```
133+
Task 2 0
134+
Task 2 1
135+
Task 2 2
136+
Task 1 0
137+
Task 2 4
138+
Task 1 1
139+
Task 1 2
140+
```
141+
142+
However in this particular case the newest values are the dropped elements.
143+
144+
The `.bounded(N)` policy enforces consumption to prevent any side from being beyond a given amount away from other sides' consumption.
145+
146+
```swift
147+
let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bounded(1))
148+
149+
let t1 = Task {
150+
for await element in exampleSource {
151+
if element == 0 {
152+
try? await Task.sleep(for: .seconds(1))
153+
}
154+
print("Task 1", element)
155+
}
156+
}
157+
158+
let t2 = Task {
159+
for await element in exampleSource {
160+
if element == 3 {
161+
try? await Task.sleep(for: .seconds(1))
162+
}
163+
print("Task 2", element)
164+
}
165+
}
166+
167+
await t1.value
168+
await t2.value
169+
```
170+
171+
Will have a potential ordering output of:
172+
173+
```
174+
Task 2 0
175+
Task 2 1
176+
Task 1 0
177+
Task 1 1
178+
Task 2 2
179+
Task 1 2
180+
Task 1 3
181+
Task 1 4
182+
Task 2 3
183+
Task 2 4
184+
```
185+
186+
In that example output Task 2 can get element 0 and 1 but must await until task 1 has caught up to the specified buffering. This limit means that no additional iteration (and no values are then dropped) is made until the buffer count is below the specified value.
187+
188+
189+
## Effect on API resilience
190+
191+
This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing).
192+
193+
## Alternatives considered
194+
195+
It has been considered that the buffering policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to use an existing type from a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream.
196+
197+

Package.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,20 @@
33
import PackageDescription
44
import CompilerPluginSupport
55

6+
let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"
7+
#if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability
8+
let AsyncAlgorithms_v1_1 =
9+
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
10+
#else
11+
let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0"
12+
#endif
13+
614
let availabilityMacros: [SwiftSetting] = [
715
.enableExperimentalFeature(
8-
"AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"
16+
AsyncAlgorithms_v1_0
917
),
1018
.enableExperimentalFeature(
11-
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
19+
AsyncAlgorithms_v1_1
1220
),
1321
]
1422

Sources/AsyncAlgorithms/AsyncAlgorithms.docc/AsyncAlgorithms.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ This package has three main goals:
3030
- <doc:Merge>
3131
- <doc:Reductions>
3232
- <doc:RemoveDuplicates>
33+
- <doc:Share>
3334
- <doc:Throttle>
3435
- <doc:Timer>
3536
- <doc:Zip>

0 commit comments

Comments
 (0)