1818 */
1919import { newError , Record , ResultSummary } from 'neo4j-driver-core'
2020import { Observable , Subject , ReplaySubject , from } from 'rxjs'
21- import { flatMap , publishReplay , refCount , shareReplay } from 'rxjs/operators'
21+ import { flatMap , publishReplay , refCount } from 'rxjs/operators'
2222
2323const States = {
2424 READY : 0 ,
@@ -44,7 +44,8 @@ export default class RxResult {
4444 publishReplay ( 1 ) ,
4545 refCount ( )
4646 )
47- this . _records = new Subject ( )
47+ this . _records = undefined
48+ this . _controls = new StreamControl ( )
4849 this . _summary = new ReplaySubject ( )
4950 this . _state = States . READY
5051 }
@@ -72,14 +73,16 @@ export default class RxResult {
7273 * @returns {Observable<Record> } - An observable stream of records.
7374 */
7475 records ( ) {
75- return this . _result . pipe (
76+ const result = this . _result . pipe (
7677 flatMap (
7778 result =>
7879 new Observable ( recordsObserver =>
7980 this . _startStreaming ( { result, recordsObserver } )
8081 )
8182 )
8283 )
84+ result . push = ( ) => this . _push ( )
85+ return result
8386 }
8487
8588 /**
@@ -102,6 +105,44 @@ export default class RxResult {
102105 )
103106 }
104107
108+ /**
109+ * Pauses the automatic streaming of records.
110+ *
111+ * This method provides a way of controll the flow of records
112+ *
113+ * @experimental
114+ */
115+ pause ( ) {
116+ this . _controls . pause ( )
117+ }
118+
119+ /**
120+ * Resumes the automatic streaming of records.
121+ *
122+ * This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused.
123+ *
124+ * This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed.
125+ * @experimental
126+ * @returns {Promise<void> } - A promise that resolves when the stream is resumed.
127+ */
128+ resume ( ) {
129+ return this . _controls . resume ( )
130+ }
131+
132+ /**
133+ * Pushes the next record to the stream.
134+ *
135+ * This method automatic pause the auto-streaming of records and then push next record to the stream.
136+ *
137+ * For returning the automatic streaming of records, use {@link resume} method.
138+ *
139+ * @experimental
140+ * @returns {Promise<void> } - A promise that resolves when the push is completed.
141+ */
142+ push ( ) {
143+ return this . _controls . push ( )
144+ }
145+
105146 _startStreaming ( {
106147 result,
107148 recordsObserver = null ,
@@ -115,9 +156,11 @@ export default class RxResult {
115156
116157 if ( this . _state < States . STREAMING ) {
117158 this . _state = States . STREAMING
118-
159+ this . _setupRecordsStream ( result )
119160 if ( recordsObserver ) {
120161 subscriptions . push ( this . _records . subscribe ( recordsObserver ) )
162+ } else {
163+ result . _cancel ( )
121164 }
122165
123166 subscriptions . push ( {
@@ -127,30 +170,6 @@ export default class RxResult {
127170 }
128171 }
129172 } )
130-
131- if ( this . _records . observers . length === 0 ) {
132- result . _cancel ( )
133- }
134-
135- result . subscribe ( {
136- onNext : record => {
137- this . _records . next ( record )
138- } ,
139- onCompleted : summary => {
140- this . _records . complete ( )
141-
142- this . _summary . next ( summary )
143- this . _summary . complete ( )
144-
145- this . _state = States . COMPLETED
146- } ,
147- onError : err => {
148- this . _records . error ( err )
149- this . _summary . error ( err )
150-
151- this . _state = States . COMPLETED
152- }
153- } )
154173 } else if ( recordsObserver ) {
155174 recordsObserver . error (
156175 newError (
@@ -163,4 +182,102 @@ export default class RxResult {
163182 subscriptions . forEach ( s => s . unsubscribe ( ) )
164183 }
165184 }
185+
186+ _setupRecordsStream ( result ) {
187+ if ( this . _records ) {
188+ return this . _records
189+ }
190+
191+ this . _records = createFullyControlledSubject (
192+ result [ Symbol . asyncIterator ] ( ) ,
193+ {
194+ complete : async ( ) => {
195+ this . _state = States . COMPLETED
196+ this . _summary . next ( await result . summary ( ) )
197+ this . _summary . complete ( )
198+ } ,
199+ error : error => {
200+ this . _state = States . COMPLETED
201+ this . _summary . error ( error )
202+ }
203+ } ,
204+ this . _controls
205+ )
206+ return this . _records
207+ }
208+ }
209+
210+ function createFullyControlledSubject (
211+ iterator ,
212+ completeObserver ,
213+ streamControl = new StreamControl ( )
214+ ) {
215+ const subject = new Subject ( )
216+
217+ const pushNextValue = async result => {
218+ try {
219+ streamControl . pushing = true
220+ const { done, value } = await result
221+ if ( done ) {
222+ subject . complete ( )
223+ completeObserver . complete ( )
224+ } else {
225+ subject . next ( value )
226+ if ( ! streamControl . paused ) {
227+ setImmediate ( async ( ) => await pushNextValue ( iterator . next ( ) ) )
228+ }
229+ }
230+ } catch ( error ) {
231+ subject . error ( error )
232+ completeObserver . error ( error )
233+ } finally {
234+ streamControl . pushing = false
235+ }
236+ }
237+
238+ async function push ( value ) {
239+ await pushNextValue ( iterator . next ( value ) )
240+ }
241+
242+ streamControl . pusher = push
243+ push ( )
244+
245+ return subject
246+ }
247+
248+ class StreamControl {
249+ constructor ( push = async ( ) => { } ) {
250+ this . _paused = false
251+ this . _pushing = false
252+ this . _push = push
253+ }
254+
255+ pause ( ) {
256+ this . _paused = true
257+ }
258+
259+ get paused ( ) {
260+ return this . _paused
261+ }
262+
263+ set pushing ( pushing ) {
264+ this . _pushing = pushing
265+ }
266+
267+ async resume ( ) {
268+ const wasPaused = this . _paused
269+ this . _paused = false
270+ if ( wasPaused && ! this . _pushing ) {
271+ await this . _push ( )
272+ }
273+ }
274+
275+ async push ( ) {
276+ this . pause ( )
277+ return await this . _push ( )
278+ }
279+
280+ set pusher ( push ) {
281+ this . _push = push
282+ }
166283}
0 commit comments