From 9fc43c4ad639925cd44d01672524fbd70abc61ea Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 22 Mar 2017 12:13:01 +0100 Subject: [PATCH] Fix streaming of queued records in stream observer `StreamObserver` memorizes all incoming records when it does not have a subscriber to forward records to. Later when such subscriber appears it receives all queued records in on go. This functionality did not work properly because of a missing `this` which resulted in access to an undefined variable. This commit fixes the problem and adds some tests. --- src/v1/internal/stream-observer.js | 10 +- test/internal/stream-observer.test.js | 157 ++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 test/internal/stream-observer.test.js diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index 8c91d0b17..eda799909 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -42,6 +42,8 @@ class StreamObserver { this._error = null; this._hasFailed = false; this._errorTransformer = errorTransformer; + this._observer = null; + this._conn = null; } /** @@ -69,7 +71,7 @@ class StreamObserver { this._fieldLookup = {}; if( meta.fields && meta.fields.length > 0 ) { this._fieldKeys = meta.fields; - for (var i = 0; i < meta.fields.length; i++) { + for (let i = 0; i < meta.fields.length; i++) { this._fieldLookup[meta.fields[i]] = i; } } @@ -94,7 +96,7 @@ class StreamObserver { * @param {Object} error - An error object */ onError(error) { - let transformedError = this._errorTransformer(error, this._conn); + const transformedError = this._errorTransformer(error, this._conn); if(this._hasFailed) { return; } @@ -123,8 +125,8 @@ class StreamObserver { return; } if( this._queuedRecords.length > 0 ) { - for (var i = 0; i < _queuedRecords.length; i++) { - observer.onNext( _queuedRecords[i] ); + for (let i = 0; i < this._queuedRecords.length; i++) { + observer.onNext( this._queuedRecords[i] ); } } if( this._tail ) { diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js new file mode 100644 index 000000000..74ad5632e --- /dev/null +++ b/test/internal/stream-observer.test.js @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import StreamObserver from '../../src/v1/internal/stream-observer'; +import FakeConnection from './fake-connection'; + +const NO_OP = () => { +}; + +describe('StreamObserver', () => { + + it('remembers resolved connection', () => { + const streamObserver = newStreamObserver(); + const connection = new FakeConnection(); + + streamObserver.resolveConnection(connection); + + expect(streamObserver._conn).toBe(connection); + }); + + it('remembers subscriber', () => { + const streamObserver = newStreamObserver(); + const subscriber = newObserver(); + + streamObserver.subscribe(subscriber); + + expect(streamObserver._observer).toBe(subscriber); + }); + + it('passes received records to the subscriber', () => { + const streamObserver = newStreamObserver(); + const receivedRecords = []; + const observer = newObserver(record => { + receivedRecords.push(record); + }); + + streamObserver.subscribe(observer); + streamObserver.onCompleted({fields: ['A', 'B', 'C']}); + + streamObserver.onNext([1, 2, 3]); + streamObserver.onNext([11, 22, 33]); + streamObserver.onNext([111, 222, 333]); + + expect(receivedRecords.length).toEqual(3); + expect(receivedRecords[0].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3}); + expect(receivedRecords[1].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33}); + expect(receivedRecords[2].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333}); + }); + + it('queues received record when no subscriber', () => { + const streamObserver = newStreamObserver(); + + streamObserver.onCompleted({fields: ['A', 'B', 'C']}); + + streamObserver.onNext([1111, 2222, 3333]); + streamObserver.onNext([111, 222, 333]); + streamObserver.onNext([11, 22, 33]); + streamObserver.onNext([1, 2, 3]); + + const queuedRecords = streamObserver._queuedRecords; + + expect(queuedRecords.length).toEqual(4); + expect(queuedRecords[0].toObject()).toEqual({'A': 1111, 'B': 2222, 'C': 3333}); + expect(queuedRecords[1].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333}); + expect(queuedRecords[2].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33}); + expect(queuedRecords[3].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3}); + }); + + it('passes received error the subscriber', () => { + const streamObserver = newStreamObserver(); + const error = new Error('Invalid Cypher statement'); + + let receivedError = null; + const observer = newObserver(NO_OP, error => { + receivedError = error; + }); + + streamObserver.subscribe(observer); + streamObserver.onError(error); + + expect(receivedError).toBe(error); + }); + + it('passes existing error to a new subscriber', () => { + const streamObserver = newStreamObserver(); + const error = new Error('Invalid Cypher statement'); + + streamObserver.onError(error); + + streamObserver.subscribe(newObserver(NO_OP, receivedError => { + expect(receivedError).toBe(error); + })); + }); + + it('passes queued records to a new subscriber', () => { + const streamObserver = newStreamObserver(); + + streamObserver.onCompleted({fields: ['A', 'B', 'C']}); + + streamObserver.onNext([1, 2, 3]); + streamObserver.onNext([11, 22, 33]); + streamObserver.onNext([111, 222, 333]); + + const receivedRecords = []; + streamObserver.subscribe(newObserver(record => { + receivedRecords.push(record); + })); + + expect(receivedRecords.length).toEqual(3); + expect(receivedRecords[0].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3}); + expect(receivedRecords[1].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33}); + expect(receivedRecords[2].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333}); + }); + + it('passes existing metadata to a new subscriber', () => { + const streamObserver = newStreamObserver(); + + streamObserver.onCompleted({fields: ['Foo', 'Bar', 'Baz', 'Qux']}); + streamObserver.onCompleted({metaDataField1: 'value1', metaDataField2: 'value2'}); + + let receivedMetaData = null; + streamObserver.subscribe(newObserver(NO_OP, NO_OP, metaData => { + receivedMetaData = metaData; + })); + + expect(receivedMetaData).toEqual({metaDataField1: 'value1', metaDataField2: 'value2'}); + }); + +}); + +function newStreamObserver() { + return new StreamObserver(); +} + +function newObserver(onNext = NO_OP, onError = NO_OP, onCompleted = NO_OP) { + return { + onNext: onNext, + onError: onError, + onCompleted: onCompleted + }; +}