Skip to content

BREAKING: Converts to Swift Concurrency #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

NeedleInAJayStack
Copy link
Member

This removes the NIO dependency. It is breaking because it removes all Swift NIO-isms that were present in the public APIs (like EventLoopFuture and EventLoopGroup argument/return types).

paulofaria
paulofaria previously approved these changes Jun 24, 2025
@@ -30,13 +30,13 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {

@available(macOS 10.15, iOS 15, watchOS 8, tvOS 15, *)
extension AsyncThrowingStream {
func mapStream<To>(_ closure: @escaping (Element) throws -> To)
func mapStream<To>(_ closure: @escaping (Element) async throws -> To)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you have this when AsyncSequence.map exists?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unstructured Tasks aren't great. They make cancellation hard.
In actual fact you could just replace the whole of this file with AsyncStream<Element>. This opens up all the functions that AsyncSequence has eg map, filter, compactMap, first, etc and cancellation will come for free.

Copy link
Member Author

@NeedleInAJayStack NeedleInAJayStack Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had created this a few years ago before Swift Concurrency existed to abstract different AsyncSequence/PubSub backends (at the time, options included RxSwift and Combine). Since we're switching over to concurrency, I agree, we should switch to having AsyncThrowingStream as a first class member in our API. I've done that work here: 9ec313a

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @adam-fowler’s point is to use some async sequence in the public APIs instead of the concrete stream, is that correct @adam-fowler ?

This removes the NIO dependency. It is breaking because it removes all Swift NIO-isms that were present in the public APIs (like EventLoopFuture and EventLoopGroup argument/return types).
@NeedleInAJayStack NeedleInAJayStack changed the title Draft: feat!: Uses swift concurrency under the hood BREAKING: Converts to Swift Concurrency Jun 25, 2025
The intent is to replace it with swift-distributed-tracing integration.
This resolves the race condition caused by the inbox counts and the event delivery. If event delivery happens before the subsequent publish increments the inbox counts, then the counts will be lower than expected. Resolved by just not asking for inbox counts, since they aren't relevant to the test.
This was causing test hangs on macOS
@NeedleInAJayStack
Copy link
Member Author

Hey @adam-fowler & @paulofaria - I managed to get the tests passing, and I think this is ready for you guys to look again. Could you give it a pass on my changes following your comments?

Comment on lines 47 to 49
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
// (which we cannot know),
// and we need the result to be a concrete type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't GraphQLResult a concrete type, though? What am I missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - GraphQLResult is a concrete type, but the concrete signature of the Stream or Sequence can get really complex as you manipulate it. For example, if I take an AsyncSequence<String> and then call .filter and then .map to get a Bool, then the resulting type will be AsyncMapSequence<AsyncFilterSequence<AsyncSequence<String>>, Bool>. This type conforms to AsyncSequence with Element == Bool, but it's true signature is pretty complex.

The underlying issue here is that trying to call (any AsyncSequence).map will give the compilation error: Member 'map' cannot be used on value of type 'any AsyncSequence'; consider using a generic constraint instead. But using a generic type constraint means that we can't accept any AsyncSequence as our resolver type, which dramatically limits the resolver definition API (for reasons above).

Instead, (any AsyncSequence) does allow a user to iterate it, with each result typed as Any, which we can then bind into a new sequence.

Sorry - I know this is kinda nuanced. Let me know if you have more questions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the sourceStream element be anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a type erased AsyncSequence to do the mapping (to avoid the additional task. This is a AnyAsyncSequence I use in hummingbird (code was originally stolen from AsyncHTTPClient)

@usableFromInline
struct AnyAsyncSequence<Element>: AsyncSequence {
    @usableFromInline
    typealias AsyncIteratorNextCallback = () async throws -> Element?

    @usableFromInline
    let makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback

    @inlinable
    init<AS: AsyncSequence>(_ base: AS) where AS.Element == Element, AS: Sendable, AS.AsyncIterator: _HB_SendableMetatype {
        self.makeAsyncIteratorCallback = {
            var iterator = base.makeAsyncIterator()
            return {
                try await iterator.next()
            }
        }
    }

    @usableFromInline
    struct AsyncIterator: AsyncIteratorProtocol {
        @usableFromInline
        let nextCallback: AsyncIteratorNextCallback

        @usableFromInline
        init(nextCallback: @escaping AsyncIteratorNextCallback) {
            self.nextCallback = nextCallback
        }

        @inlinable
        func next() async throws -> Element? {
            try await self.nextCallback()
        }
    }

    @inlinable
    func makeAsyncIterator() -> AsyncIterator {
        .init(nextCallback: self.makeAsyncIteratorCallback())
    }
}

extension AsyncSequence where Self: Sendable {
    var any: AnyAsyncSequence<Element> { .init(self) }
}

You could use something like this that also includes a mapping closure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is awesome! I'm going to give this a try tomorrow, and will report back!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @adam-fowler - I've implemented in this commit on a separate branch: NeedleInAJayStack@d0945a5

It avoids the unstructured task, but requires that we export our new AnyAsyncSequence type. What do you think? Is this preferable?

Copy link
Member Author

@NeedleInAJayStack NeedleInAJayStack Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this again and ended up with e1c66d9

It takes inspiration from your AnyAsyncSequence and it's iterator callbacks, but doesn't introduce a new type and allows us to return AsyncThrowingStream directly. @adam-fowler Let me know if you see any red flags - all the tests seem to be working fine.

Comment on lines 47 to 49
// We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
// (which we cannot know),
// and we need the result to be a concrete type.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - GraphQLResult is a concrete type, but the concrete signature of the Stream or Sequence can get really complex as you manipulate it. For example, if I take an AsyncSequence<String> and then call .filter and then .map to get a Bool, then the resulting type will be AsyncMapSequence<AsyncFilterSequence<AsyncSequence<String>>, Bool>. This type conforms to AsyncSequence with Element == Bool, but it's true signature is pretty complex.

The underlying issue here is that trying to call (any AsyncSequence).map will give the compilation error: Member 'map' cannot be used on value of type 'any AsyncSequence'; consider using a generic constraint instead. But using a generic type constraint means that we can't accept any AsyncSequence as our resolver type, which dramatically limits the resolver definition API (for reasons above).

Instead, (any AsyncSequence) does allow a user to iterate it, with each result typed as Any, which we can then bind into a new sequence.

Sorry - I know this is kinda nuanced. Let me know if you have more questions.

@NeedleInAJayStack NeedleInAJayStack force-pushed the feat/swift-concurrency branch from db77ae5 to c398797 Compare July 5, 2025 06:15
@NeedleInAJayStack NeedleInAJayStack force-pushed the feat/swift-concurrency branch from c398797 to 34268d3 Compare July 5, 2025 06:17
paulofaria
paulofaria previously approved these changes Jul 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants