@@ -67,24 +67,25 @@ await cursor.ForEachAsync(change =>
6767
6868// start-split-event-helpers-sync
6969// Fetches the next complete change stream event
70- private static ChangeStreamDocument < TDocument > GetNextChangeStreamEvent < TDocument > (
70+ private static IEnumerable < ChangeStreamDocument < TDocument > > GetNextChangeStreamEvent < TDocument > (
7171 IEnumerator < ChangeStreamDocument < TDocument > > changeStreamEnumerator )
72- {
73- var changeStreamEvent = changeStreamEnumerator . Current ;
74-
75- // Reassembles change event fragments if the event is split
76- if ( changeStreamEvent . SplitEvent != null )
77- {
78- var fragment = changeStreamEvent ;
79- while ( fragment . SplitEvent . Fragment < fragment . SplitEvent . Of )
80- {
81- changeStreamEnumerator . MoveNext ( ) ;
82- fragment = changeStreamEnumerator . Current ;
83- MergeFragment ( changeStreamEvent , fragment ) ;
84- }
85- }
86- return changeStreamEvent ;
87- }
72+ {
73+ while ( changeStreamEnumerator . MoveNext ( ) )
74+ {
75+ var changeStreamEvent = changeStreamEnumerator . Current ;
76+ if ( changeStreamEvent . SplitEvent != null )
77+ {
78+ var fragment = changeStreamEvent ;
79+ while ( fragment . SplitEvent . Fragment < fragment . SplitEvent . Of )
80+ {
81+ changeStreamEnumerator . MoveNext ( ) ;
82+ fragment = changeStreamEnumerator . Current ;
83+ MergeFragment ( changeStreamEvent , fragment ) ;
84+ }
85+ }
86+ yield return changeStreamEvent ;
87+ }
88+ }
8889
8990// Merges a fragment into the base event
9091private static void MergeFragment < TDocument > (
@@ -103,26 +104,37 @@ private static void MergeFragment<TDocument>(
103104
104105// start-split-event-helpers-async
105106// Fetches the next complete change stream event
106- private static async Task < ChangeStreamDocument < TDocument > > GetNextChangeStreamEvent < TDocument > (
107+ private static async IAsyncEnumerable < ChangeStreamDocument < TDocument > > GetNextChangeStreamEventAsync < TDocument > (
107108 IAsyncCursor < ChangeStreamDocument < TDocument > > changeStreamCursor )
108109{
109- var changeStreamEvent = changeStreamCursor . Current . First ( ) ;
110-
111- // Reassembles change event fragments if the event is split
112- if ( changeStreamEvent . SplitEvent != null )
110+ var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync ( changeStreamCursor ) . GetAsyncEnumerator ( ) ;
111+ while ( await changeStreamEnumerator . MoveNextAsync ( ) )
113112 {
114- var fragment = changeStreamEvent ;
115- while ( fragment . SplitEvent . Fragment < fragment . SplitEvent . Of )
113+ var changeStreamEvent = changeStreamEnumerator . Current ;
114+ if ( changeStreamEvent . SplitEvent != null )
116115 {
117- if ( ! await changeStreamCursor . MoveNextAsync ( ) )
116+ var fragment = changeStreamEvent ;
117+ while ( fragment . SplitEvent . Fragment < fragment . SplitEvent . Of )
118118 {
119- throw new InvalidOperationException ( "Incomplete split event fragments." ) ;
119+ await changeStreamEnumerator . MoveNextAsync ( ) ;
120+ fragment = changeStreamEnumerator . Current ;
121+ MergeFragment ( changeStreamEvent , fragment ) ;
120122 }
121- fragment = changeStreamCursor . Current . First ( ) ;
122- MergeFragment ( changeStreamEvent , fragment ) ;
123+ }
124+ yield return changeStreamEvent ;
125+ }
126+ }
127+
128+ private static async IAsyncEnumerable < ChangeStreamDocument < TDocument > > GetNextChangeStreamEventFragmentAsync < TDocument > (
129+ IAsyncCursor < ChangeStreamDocument < TDocument > > changeStreamCursor )
130+ {
131+ while ( await changeStreamCursor . MoveNextAsync ( ) )
132+ {
133+ foreach ( var changeStreamEvent in changeStreamCursor . Current )
134+ {
135+ yield return changeStreamEvent ;
123136 }
124137 }
125- return changeStreamEvent ;
126138}
127139
128140// Merges a fragment into the base event
@@ -140,39 +152,27 @@ private static void MergeFragment<TDocument>(
140152}
141153// end-split-event-helpers-async
142154
143- // start-split-change-event-async
155+ // start-split-change-event-sync
144156var pipeline = new EmptyPipelineDefinition < ChangeStreamDocument < Restaurant > > ( )
145157 . ChangeStreamSplitLargeEvent ( ) ;
146158
147- using ( var cursor = await collection . WatchAsync ( pipeline ) )
148- {
149- while ( await cursor . MoveNextAsync ( ) )
150- {
151- foreach ( var changeStreamEvent in cursor . Current )
152- {
153- var completeEvent = await GetNextChangeStreamEvent ( cursor ) ;
154- Console . WriteLine ( "Received the following change: " + completeEvent . BackingDocument ) ;
155- }
156- }
157- }
158- // end-split-change-event-async
159+ using var cursor = collection . Watch ( pipeline ) ;
160+ foreach ( var completeEvent in GetNextChangeStreamEvent ( cursor . ToEnumerable ( ) . GetEnumerator ( ) ) )
161+ {
162+ Console . WriteLine ( "Received the following change: " + completeEvent . BackingDocument ) ;
163+ }
164+ // end-split-change-event-sync
159165
160- // start-split-change-event-sync
166+ // start-split-change-event-async
161167var pipeline = new EmptyPipelineDefinition < ChangeStreamDocument < Restaurant > > ( )
162168 . ChangeStreamSplitLargeEvent ( ) ;
163169
164- using ( var cursor = collection . Watch ( pipeline ) )
165- {
166- using ( var enumerator = cursor . ToEnumerable ( ) . GetEnumerator ( ) )
167- {
168- while ( enumerator . MoveNext ( ) )
169- {
170- var completeEvent = GetNextChangeStreamEvent ( enumerator ) ;
171- Console . WriteLine ( "Received the following change: " + completeEvent . BackingDocument ) ;
172- }
173- }
174- }
175- // end-split-change-event-sync
170+ using var cursor = await collection . WatchAsync ( pipeline ) ;
171+ await foreach ( var completeEvent in GetNextChangeStreamEventAsync ( cursor ) )
172+ {
173+ Console . WriteLine ( "Received the following change: " + completeEvent . BackingDocument ) ;
174+ }
175+ // end-split-change-event-async
176176
177177// start-change-stream-post-image
178178var pipeline = new EmptyPipelineDefinition < ChangeStreamDocument < Restaurant > > ( )
0 commit comments