diff --git a/neokit b/neokit index 8ee8fa80b..868498aa8 160000 --- a/neokit +++ b/neokit @@ -1 +1 @@ -Subproject commit 8ee8fa80b2e560339a18f1872d314d3ba64f0427 +Subproject commit 868498aa8cd589975b3315577c024f879bea0c5b diff --git a/src/v1/driver.js b/src/v1/driver.js index d61c8265a..2cede31c8 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -58,7 +58,7 @@ class Driver { Driver._validateConnection.bind(this), config.connectionPoolSize ); - this._connectionProvider = this._createConnectionProvider(url, this._pool); + this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this)); } /** @@ -108,20 +108,14 @@ class Driver { * it is returned to the pool, the session will be reset to a clean state and * made available for others to use. * - * @param {String} mode of session - optional + * @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}. + * @param {string} [bookmark=null] the initial reference to some previous transaction. Value is optional and + * absence indicates that that the bookmark does not exist or is unknown. * @return {Session} new session. */ - session(mode) { + session(mode, bookmark) { const sessionMode = Driver._validateSessionMode(mode); - const connectionPromise = this._connectionProvider.acquireConnection(sessionMode); - connectionPromise.catch((err) => { - if (this.onError && err.code === SERVICE_UNAVAILABLE) { - this.onError(err); - } else { - //we don't need to tell the driver about this error - } - }); - return this._createSession(connectionPromise); + return this._createSession(sessionMode, this._connectionProvider, bookmark); } static _validateSessionMode(rawMode) { @@ -133,13 +127,22 @@ class Driver { } //Extension point - _createConnectionProvider(address, connectionPool) { - return new DirectConnectionProvider(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback); } //Extension point - _createSession(connectionPromise) { - return new Session(connectionPromise); + _createSession(mode, connectionProvider, bookmark) { + return new Session(mode, connectionProvider, bookmark); + } + + _driverOnErrorCallback(error) { + const userDefinedOnErrorCallback = this.onError; + if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) { + userDefinedOnErrorCallback(error); + } else { + // we don't need to tell the driver about this error + } } /** diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js new file mode 100644 index 000000000..bae63cacc --- /dev/null +++ b/src/v1/internal/connection-holder.js @@ -0,0 +1,137 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {newError} from '../error'; + +/** + * Utility to lazily initialize connections and return them back to the pool when unused. + */ +export default class ConnectionHolder { + + /** + * @constructor + * @param {string} mode - the access mode for new connection holder. + * @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from. + */ + constructor(mode, connectionProvider) { + this._mode = mode; + this._connectionProvider = connectionProvider; + this._referenceCount = 0; + this._connectionPromise = Promise.resolve(null); + } + + /** + * Make this holder initialize new connection if none exists already. + * @return {undefined} + */ + initializeConnection() { + if (this._referenceCount === 0) { + this._connectionPromise = this._connectionProvider.acquireConnection(this._mode); + } + this._referenceCount++; + } + + /** + * Get the current connection promise. + * @return {Promise} promise resolved with the current connection. + */ + getConnection() { + return this._connectionPromise; + } + + /** + * Notify this holder that single party does not require current connection any more. + * @return {Promise} promise resolved with the current connection. + */ + releaseConnection() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + + this._referenceCount--; + if (this._referenceCount === 0) { + // release a connection without muting ACK_FAILURE, this is the last action on this connection + return this._releaseConnection(true); + } + return this._connectionPromise; + } + + /** + * Closes this holder and releases current connection (if any) despite any existing users. + * @return {Promise} promise resolved when current connection is released to the pool. + */ + close() { + if (this._referenceCount === 0) { + return this._connectionPromise; + } + this._referenceCount = 0; + // release a connection and mute ACK_FAILURE, this might be called concurrently with other + // operations and thus should ignore failure handling + return this._releaseConnection(false); + } + + /** + * Return the current pooled connection instance to the connection pool. + * We don't pool Session instances, to avoid users using the Session after they've called close. + * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. + * @return {Promise} - promise resolved then connection is returned to the pool. + * @private + */ + _releaseConnection(sync) { + this._connectionPromise = this._connectionPromise.then(connection => { + if (connection) { + if(sync) { + connection.reset(); + } else { + connection.resetAsync(); + } + connection.sync(); + connection._release(); + } + }).catch(ignoredError => { + }); + + return this._connectionPromise; + } +} + +class EmptyConnectionHolder extends ConnectionHolder { + + initializeConnection() { + // nothing to initialize + } + + getConnection() { + return Promise.reject(newError('This connection holder does not serve connections')); + } + + releaseConnection() { + return Promise.resolve(); + } + + close() { + return Promise.resolve(); + } +} + +/** + * Connection holder that does not manage any connections. + * @type {ConnectionHolder} + */ +export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder(); diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 8b8db5b43..c7dcf31b2 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -29,32 +29,46 @@ class ConnectionProvider { acquireConnection(mode) { throw new Error('Abstract method'); } + + _withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) { + // install error handler from the driver on the connection promise; this callback is installed separately + // so that it does not handle errors, instead it is just an additional error reporting facility. + connectionPromise.catch(error => { + driverOnErrorCallback(error) + }); + // return the original connection promise + return connectionPromise; + } } export class DirectConnectionProvider extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._address = address; this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return Promise.resolve(this._connectionPool.acquire(this._address)); + const connection = this._connectionPool.acquire(this._address); + const connectionPromise = Promise.resolve(connection); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } } export class LoadBalancer extends ConnectionProvider { - constructor(address, connectionPool) { + constructor(address, connectionPool, driverOnErrorCallback) { super(); this._routingTable = new RoutingTable(new RoundRobinArray([address])); this._rediscovery = new Rediscovery(); this._connectionPool = connectionPool; + this._driverOnErrorCallback = driverOnErrorCallback; } acquireConnection(mode) { - return this._freshRoutingTable().then(routingTable => { + const connectionPromise = this._freshRoutingTable().then(routingTable => { if (mode === READ) { return this._acquireConnectionToServer(routingTable.readers, 'read'); } else if (mode === WRITE) { @@ -63,6 +77,7 @@ export class LoadBalancer extends ConnectionProvider { throw newError('Illegal mode ' + mode); } }); + return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback); } forget(address) { @@ -132,7 +147,8 @@ export class LoadBalancer extends ConnectionProvider { _createSessionForRediscovery(routerAddress) { const connection = this._connectionPool.acquire(routerAddress); const connectionPromise = Promise.resolve(connection); - return new Session(connectionPromise); + const connectionProvider = new SingleConnectionProvider(connectionPromise); + return new Session(READ, connectionProvider); } _updateRoutingTable(newRoutingTable) { @@ -153,3 +169,17 @@ export class LoadBalancer extends ConnectionProvider { } } } + +export class SingleConnectionProvider extends ConnectionProvider { + + constructor(connectionPromise) { + super(); + this._connectionPromise = connectionPromise; + } + + acquireConnection(mode) { + const connectionPromise = this._connectionPromise; + this._connectionPromise = null; + return connectionPromise; + } +} diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 901abeefc..7d0ed8dc2 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -96,7 +96,6 @@ function log(actor, msg) { } } - function NO_OP(){} let NO_OP_OBSERVER = { @@ -384,9 +383,9 @@ class Connection { this._chunker.messageBoundary(); } - /** Queue a RESET-message to be sent to the database */ - reset( observer ) { - log("C", "RESET"); + /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ + resetAsync( observer ) { + log("C", "RESET_ASYNC"); this._isHandlingFailure = true; let self = this; let wrappedObs = { @@ -404,6 +403,14 @@ class Connection { this._chunker.messageBoundary(); } + /** Queue a RESET-message to be sent to the database */ + reset(observer) { + log('C', 'RESET'); + this._queueObserver(observer); + this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } + /** Queue a ACK_FAILURE-message to be sent to the database */ _ackFailure( observer ) { log("C", "ACK_FAILURE"); diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 0591f8b29..d94da81df 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -17,9 +17,9 @@ * limitations under the License. */ -import RoundRobinArray from "./round-robin-array"; -import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error"; -import Integer, {int} from "../integer"; +import RoundRobinArray from './round-robin-array'; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; +import Integer, {int} from '../integer'; const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; diff --git a/src/v1/result.js b/src/v1/result.js index 6606b58db..98afe77fd 100644 --- a/src/v1/result.js +++ b/src/v1/result.js @@ -18,6 +18,7 @@ */ import ResultSummary from './result-summary'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * A stream of {@link Record} representing the result of a statement. @@ -32,13 +33,15 @@ class Result { * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement * @param metaSupplier function, when called provides metadata + * @param {ConnectionHolder} connectionHolder - to be notified when result is either fully consumed or error happened. */ - constructor(streamObserver, statement, parameters, metaSupplier) { + constructor(streamObserver, statement, parameters, metaSupplier, connectionHolder) { this._streamObserver = streamObserver; this._p = null; this._statement = statement; this._parameters = parameters || {}; this._metaSupplier = metaSupplier || function(){return {};}; + this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER; } /** @@ -99,23 +102,39 @@ class Result { * @return */ subscribe(observer) { - let onCompletedOriginal = observer.onCompleted; - let self = this; - let onCompletedWrapper = (metadata) => { + const onCompletedOriginal = observer.onCompleted; + const self = this; + const onCompletedWrapper = (metadata) => { - let additionalMeta = self._metaSupplier(); - for(var key in additionalMeta) { + const additionalMeta = self._metaSupplier(); + for(let key in additionalMeta) { if (additionalMeta.hasOwnProperty(key)) { metadata[key] = additionalMeta[key]; } } - let sum = new ResultSummary(this._statement, this._parameters, metadata); - onCompletedOriginal.call(observer, sum); + const sum = new ResultSummary(this._statement, this._parameters, metadata); + + // notify connection holder that the used connection is not needed any more because result has + // been fully consumed; call the original onCompleted callback after that + self._connectionHolder.releaseConnection().then(() => { + onCompletedOriginal.call(observer, sum); + }); }; observer.onCompleted = onCompletedWrapper; - observer.onError = observer.onError || ((err) => { - console.log("Uncaught error when processing result: " + err); + + const onErrorOriginal = observer.onError || (error => { + console.log("Uncaught error when processing result: " + error); }); + + const onErrorWrapper = error => { + // notify connection holder that the used connection is not needed any more because error happened + // and result can't bee consumed any further; call the original onError callback after that + self._connectionHolder.releaseConnection().then(() => { + onErrorOriginal.call(observer, error); + }); + }; + observer.onError = onErrorWrapper; + this._streamObserver.subscribe(observer); } } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 1f2091558..e3be9a561 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -31,30 +31,24 @@ class RoutingDriver extends Driver { super(url, userAgent, token, RoutingDriver._validateConfig(config)); } - _createConnectionProvider(address, connectionPool) { - return new LoadBalancer(address, connectionPool); + _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { + return new LoadBalancer(address, connectionPool, driverOnErrorCallback); } - _createSession(connectionPromise) { - return new RoutingSession(connectionPromise, (error, conn) => { + _createSession(mode, connectionProvider, bookmark) { + return new RoutingSession(mode, connectionProvider, bookmark, (error, conn) => { if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) { + // connection is undefined if error happened before connection was acquired if (conn) { this._connectionProvider.forget(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forget(conn.url); - }).catch(() => {/*ignore*/}); } return error; } else if (error.code === 'Neo.ClientError.Cluster.NotALeader') { let url = 'UNKNOWN'; + // connection is undefined if error happened before connection was acquired if (conn) { url = conn.url; this._connectionProvider.forgetWriter(conn.url); - } else { - connectionPromise.then((conn) => { - this._connectionProvider.forgetWriter(conn.url); - }).catch(() => {/*ignore*/}); } return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED); } else { @@ -72,8 +66,8 @@ class RoutingDriver extends Driver { } class RoutingSession extends Session { - constructor(connectionPromise, onFailedConnection) { - super(connectionPromise); + constructor(mode, connectionProvider, bookmark, onFailedConnection) { + super(mode, connectionProvider, bookmark); this._onFailedConnection = onFailedConnection; } diff --git a/src/v1/session.js b/src/v1/session.js index d308781b6..958aba447 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -21,6 +21,8 @@ import Result from './result'; import Transaction from './transaction'; import {newError} from './error'; import {assertString} from './internal/util'; +import ConnectionHolder from './internal/connection-holder'; +import {READ, WRITE} from './driver'; /** * A Session instance is used for handling the connection and @@ -29,14 +31,20 @@ import {assertString} from './internal/util'; */ class Session { + /** * @constructor - * @param {Promise.} connectionPromise - Promise of a connection to use + * @param {string} mode the default access mode for this session. + * @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from. + * @param {string} bookmark - the initial bookmark for this session. */ - constructor(connectionPromise) { - this._connectionPromise = connectionPromise; + constructor(mode, connectionProvider, bookmark) { + this._mode = mode; + this._readConnectionHolder = new ConnectionHolder(READ, connectionProvider); + this._writeConnectionHolder = new ConnectionHolder(WRITE, connectionProvider); this._open = true; this._hasTx = false; + this._lastBookmark = bookmark; } /** @@ -55,19 +63,21 @@ class Session { assertString(statement, "Cypher statement"); const streamObserver = new _RunObserver(this._onRunFailure()); + const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { - this._connectionPromise.then((conn) => { - streamObserver.resolveConnection(conn); - conn.run(statement, parameters, streamObserver); - conn.pullAll(streamObserver); - conn.sync(); - }).catch((err) => streamObserver.onError(err)); + connectionHolder.initializeConnection(); + connectionHolder.getConnection().then(connection => { + streamObserver.resolveConnection(connection); + connection.run(statement, parameters, streamObserver); + connection.pullAll(streamObserver); + connection.sync(); + }).catch(error => streamObserver.onError(error)); } else { streamObserver.onError(newError("Statements cannot be run directly on a " + "session with an open transaction; either run from within the " + "transaction or use a different session.")); } - return new Result( streamObserver, statement, parameters, () => streamObserver.meta() ); + return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder ); } /** @@ -76,11 +86,17 @@ class Session { * * While a transaction is open the session cannot be used to run statements outside the transaction. * + * @param {string} bookmark - a reference to a previous transaction. DEPRECATED: This function is deprecated in + * favour of {@link Driver#session(string)} that accepts an initial bookmark. Session will ensure that all nested + * transactions are chained with bookmarks to guarantee causal consistency. * @returns {Transaction} - New Transaction */ beginTransaction(bookmark) { if (bookmark) { - assertString(bookmark, "Bookmark"); + assertString(bookmark, 'Bookmark'); + } + if (typeof bookmark !== 'undefined') { + this._lastBookmark = bookmark; } if (this._hasTx) { @@ -90,15 +106,26 @@ class Session { } this._hasTx = true; - return new Transaction(this._connectionPromise, () => { - this._hasTx = false}, - this._onRunFailure(), bookmark, (bookmark) => {this._lastBookmark = bookmark}); + + const connectionHolder = this._connectionHolderWithMode(this._mode); + connectionHolder.initializeConnection(); + + return new Transaction(connectionHolder, () => { + this._hasTx = false; + }, + this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this)); } lastBookmark() { return this._lastBookmark; } + _updateBookmark(newBookmark) { + if (newBookmark) { + this._lastBookmark = newBookmark; + } + } + /** * Close this session. * @param {function()} callback - Function to be called after the session has been closed @@ -107,7 +134,11 @@ class Session { close(callback = (() => null)) { if (this._open) { this._open = false; - this._releaseCurrentConnection().then(callback); + this._readConnectionHolder.close().then(() => { + this._writeConnectionHolder.close().then(() => { + callback(); + }); + }); } else { callback(); } @@ -118,23 +149,14 @@ class Session { return (err) => {return err}; } - /** - * Return the current pooled connection instance to the connection pool. - * We don't pool Session instances, to avoid users using the Session after they've called close. - * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. - * @return {Promise} - promise resolved then connection is returned to the pool. - * @private - */ - _releaseCurrentConnection() { - return this._connectionPromise.then(conn => { - // Queue up a 'reset', to ensure the next user gets a clean session to work with. - conn.reset(); - conn.sync(); - - // Return connection to the pool - conn._release(); - }).catch(ignoredError => { - }); + _connectionHolderWithMode(mode) { + if (mode === READ) { + return this._readConnectionHolder; + } else if (mode === WRITE) { + return this._writeConnectionHolder; + } else { + throw newError('Unknown access mode: ' + mode); + } } } diff --git a/src/v1/transaction.js b/src/v1/transaction.js index b29e534fe..cb3cd6892 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -19,6 +19,7 @@ import StreamObserver from './internal/stream-observer'; import Result from './result'; import {assertString} from './internal/util'; +import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; /** * Represents a transaction in the Neo4j database. @@ -28,24 +29,25 @@ import {assertString} from './internal/util'; class Transaction { /** * @constructor - * @param {Promise} connectionPromise - A connection to use + * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param errorTransformer callback use to transform error * @param bookmark optional bookmark * @param onBookmark callback invoked when new bookmark is produced */ - constructor(connectionPromise, onClose, errorTransformer, bookmark, onBookmark) { - this._connectionPromise = connectionPromise; + constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { + this._connectionHolder = connectionHolder; let streamObserver = new _TransactionStreamObserver(this); let params = {}; if (bookmark) { params = {bookmark: bookmark}; } - this._connectionPromise.then((conn) => { + + this._connectionHolder.getConnection().then(conn => { streamObserver.resolveConnection(conn); - conn.run("BEGIN", params, streamObserver); - conn.discardAll(streamObserver); - }).catch(streamObserver.onError); + conn.run('BEGIN', params, streamObserver); + conn.pullAll(streamObserver); + }).catch(error => streamObserver.onError(error)); this._state = _states.ACTIVE; this._onClose = onClose; @@ -68,7 +70,7 @@ class Transaction { } assertString(statement, "Cypher statement"); - return this._state.run(this._connectionPromise, new _TransactionStreamObserver(this), statement, parameters); + return this._state.run(this._connectionHolder, new _TransactionStreamObserver(this), statement, parameters); } /** @@ -79,12 +81,11 @@ class Transaction { * @returns {Result} - New Result */ commit() { - let committed = this._state.commit(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.commit(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); return committed.result; - } /** @@ -95,7 +96,7 @@ class Transaction { * @returns {Result} - New Result */ rollback() { - let committed = this._state.rollback(this._connectionPromise, new _TransactionStreamObserver(this)); + let committed = this._state.rollback(this._connectionHolder, new _TransactionStreamObserver(this)); this._state = committed.state; //clean up this._onClose(); @@ -147,101 +148,136 @@ class _TransactionStreamObserver extends StreamObserver { let _states = { //The transaction is running with no explicit success or failure marked ACTIVE: { - commit: (connectionPromise, observer) => { - return {result: _runDiscardAll("COMMIT", connectionPromise, observer), + commit: (connectionHolder, observer) => { + return {result: _runDiscardAll("COMMIT", connectionHolder, observer), state: _states.SUCCEEDED} }, - rollback: (connectionPromise, observer) => { - return {result: _runDiscardAll("ROLLBACK", connectionPromise, observer), state: _states.ROLLED_BACK}; + rollback: (connectionHolder, observer) => { + return {result: _runDiscardAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK}; }, - run: (connectionPromise, observer, statement, parameters) => { - connectionPromise.then((conn) => { + run: (connectionHolder, observer, statement, parameters) => { + connectionHolder.getConnection().then(conn => { observer.resolveConnection(conn); - conn.run( statement, parameters || {}, observer ); - conn.pullAll( observer ); + conn.run(statement, parameters || {}, observer); + conn.pullAll(observer); conn.sync(); - }).catch(observer.onError); + }).catch(error => observer.onError(error)); - return new Result( observer, statement, parameters, () => observer.serverMeta() ); + return newRunResult(observer, statement, parameters, () => observer.serverMeta()); } }, //An error has occurred, transaction can no longer be used and no more messages will // be sent for this transaction. FAILED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because previous statements in the " + "transaction has failed and the transaction has been rolled back. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.FAILED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.FAILED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.FAILED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because previous statements in the " + "transaction has failed and the transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has successfully committed SUCCEEDED: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new" + " transaction to run another statement." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.SUCCEEDED}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been successfully closed."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.SUCCEEDED}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been successfully closed."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } }, //This transaction has been rolled back ROLLED_BACK: { - commit: (conn, observer) => { + commit: (connectionHolder, observer) => { observer.onError({ error: "Cannot commit this transaction, because it has already been rolled back." }); - return {result: new Result(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "COMMIT", {}), state: _states.ROLLED_BACK}; }, - rollback: (conn, observer) => { + rollback: (connectionHolder, observer) => { observer.onError({error: "Cannot rollback transaction, because transaction has already been rolled back."}); - return {result: new Result(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; + return {result: newDummyResult(observer, "ROLLBACK", {}), state: _states.ROLLED_BACK}; }, - run: (conn, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters) => { observer.onError({error: "Cannot run statement, because transaction has already been rolled back."}); - return new Result(observer, statement, parameters); + return newDummyResult(observer, statement, parameters); } } }; -function _runDiscardAll(msg, connectionPromise, observer) { - connectionPromise.then((conn) => { - observer.resolveConnection(conn); - conn.run(msg, {}, observer); - conn.discardAll(observer); - conn.sync(); - }).catch(observer.onError); +function _runDiscardAll(msg, connectionHolder, observer) { + connectionHolder.getConnection().then( + conn => { + observer.resolveConnection(conn); + conn.run(msg, {}, observer); + conn.pullAll(observer); + conn.sync(); + }).catch(error => observer.onError(error)); + + // for commit & rollback we need result that uses real connection holder and notifies it when + // connection is not needed and can be safely released to the pool + return new Result(observer, msg, {}, emptyMetadataSupplier, connectionHolder); +} + +/** + * Creates a {@link Result} with empty connection holder. + * Should be used as a result for running cypher statements. They can result in metadata but should not + * influence real connection holder to release connections because single transaction can have + * {@link Transaction#run} called multiple times. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @param {function} metadataSupplier - the function that returns a metadata object. + * @return {Result} new result. + */ +function newRunResult(observer, statement, parameters, metadataSupplier) { + return new Result(observer, statement, parameters, metadataSupplier, EMPTY_CONNECTION_HOLDER); +} + +/** + * Creates a {@link Result} without metadata supplier and with empty connection holder. + * For cases when result represents an intermediate or failed action, does not require any metadata and does not + * need to influence real connection holder to release connections. + * @param {StreamObserver} observer - an observer for the created result. + * @param {string} statement - the cypher statement that produced the result. + * @param {object} parameters - the parameters for cypher statement that produced the result. + * @return {Result} new result. + */ +function newDummyResult(observer, statement, parameters) { + return new Result(observer, statement, parameters, emptyMetadataSupplier, EMPTY_CONNECTION_HOLDER); +} - return new Result(observer, msg, {}); +function emptyMetadataSupplier() { + return {}; } export default Transaction; diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js new file mode 100644 index 000000000..8c5d1a854 --- /dev/null +++ b/test/internal/connection-holder.test.js @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionHolder, {EMPTY_CONNECTION_HOLDER} from '../../src/v1/internal/connection-holder'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import {READ} from '../../src/v1/driver'; +import FakeConnection from './fake-connection'; + +describe('EmptyConnectionHolder', () => { + + it('should return rejected promise instead of connection', done => { + EMPTY_CONNECTION_HOLDER.getConnection().catch(() => { + done(); + }); + }); + + it('should return resolved promise on release', done => { + EMPTY_CONNECTION_HOLDER.releaseConnection().then(() => { + done(); + }); + }); + + it('should return resolved promise on close', done => { + EMPTY_CONNECTION_HOLDER.close().then(() => { + done(); + }); + }); + +}); + +describe('ConnectionHolder', () => { + + it('should acquire new connection during initialization', () => { + const connectionProvider = new RecordingConnectionProvider([new FakeConnection()]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + expect(connectionProvider.acquireConnectionInvoked).toBe(1); + }); + + it('should return acquired during initialization connection', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.getConnection().then(connection => { + expect(connection).toBe(connection); + done(); + }); + }); + + it('should release connection with single user', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + + it('should not release connection with multiple users', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should release connection with multiple users when all users release', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + }); + + it('should do nothing when closed and not initialized', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy(); + done(); + }); + }); + + it('should close even when users exist', done => { + const connection = new FakeConnection(); + const connectionProvider = newSingleConnectionProvider(connection); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + + it('should initialize new connection after releasing current one', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.releaseConnection().then(() => { + expect(connection1.isReleasedOnce()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.releaseConnection().then(() => { + expect(connection2.isReleasedOnce()).toBeTruthy(); + done(); + }); + }); + }); + + it('should initialize new connection after being closed', done => { + const connection1 = new FakeConnection(); + const connection2 = new FakeConnection(); + const connectionProvider = new RecordingConnectionProvider([connection1, connection2]); + const connectionHolder = new ConnectionHolder(READ, connectionProvider); + + connectionHolder.initializeConnection(); + + connectionHolder.close().then(() => { + expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy(); + + connectionHolder.initializeConnection(); + connectionHolder.close().then(() => { + expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy(); + done(); + }); + }); + }); +}); + +class RecordingConnectionProvider extends SingleConnectionProvider { + + constructor(connections) { + super(Promise.resolve()); + this.connectionPromises = connections.map(conn => Promise.resolve(conn)); + this.acquireConnectionInvoked = 0; + } + + acquireConnection(mode) { + return this.connectionPromises[this.acquireConnectionInvoked++]; + } +} + +function newSingleConnectionProvider(connection) { + return new SingleConnectionProvider(Promise.resolve(connection)); +} diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 662506041..04329d6a1 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -25,11 +25,14 @@ import RoundRobinArray from '../../src/v1/internal/round-robin-array'; import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers'; import Pool from '../../src/v1/internal/pool'; +const NO_OP_DRIVER_CALLBACK = () => { +}; + describe('DirectConnectionProvider', () => { it('acquires connection from the pool', done => { const pool = newPool(); - const connectionProvider = new DirectConnectionProvider('localhost:123', pool); + const connectionProvider = newDirectConnectionProvider('localhost:123', pool); connectionProvider.acquireConnection(READ).then(connection => { expect(connection).toBeDefined(); @@ -132,7 +135,7 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', newPool()); + const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -581,8 +584,12 @@ describe('LoadBalancer', () => { }); +function newDirectConnectionProvider(address, pool) { + return new DirectConnectionProvider(address, pool, NO_OP_DRIVER_CALLBACK); +} + function newLoadBalancer(routers, readers, writers, pool = null, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}) { - const loadBalancer = new LoadBalancer(null, pool || newPool()); + const loadBalancer = new LoadBalancer(null, pool || newPool(), NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable( new RoundRobinArray(routers), new RoundRobinArray(readers), diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js new file mode 100644 index 000000000..60dd3c11e --- /dev/null +++ b/test/internal/fake-connection.js @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This class is like a mock of {@link Connection} that tracks invocations count. + * It tries to maintain same "interface" as {@link Connection}. + * It could be replaced with a proper mock by a library like testdouble. + * At the time of writing such libraries require {@link Proxy} support but browser tests execute in + * PhantomJS which does not support proxies. + */ +export default class FakeConnection { + + constructor() { + this.resetInvoked = 0; + this.resetAsyncInvoked = 0; + this.syncInvoked = 0; + this.releaseInvoked = 0; + } + + run() { + } + + discardAll() { + } + + reset() { + this.resetInvoked++; + } + + resetAsync() { + this.resetAsyncInvoked++; + } + + sync() { + this.syncInvoked++; + } + + _release() { + this.releaseInvoked++; + } + + isReleasedOnceOnSessionClose() { + return this.isReleasedOnSessionCloseTimes(1); + } + + isReleasedOnSessionCloseTimes(times) { + return this.resetAsyncInvoked === times && + this.resetInvoked === 0 && + this.syncInvoked === times && + this.releaseInvoked === times; + } + + isNeverReleased() { + return this.isReleasedTimes(0); + } + + isReleasedOnce() { + return this.isReleasedTimes(1); + } + + isReleasedTimes(times) { + return this.resetAsyncInvoked === 0 && + this.resetInvoked === times && + this.syncInvoked === times && + this.releaseInvoked === times; + } +}; diff --git a/test/internal/tls.test.js b/test/internal/tls.test.js index 62ce70b77..dc58f4b09 100644 --- a/test/internal/tls.test.js +++ b/test/internal/tls.test.js @@ -269,7 +269,9 @@ describe('trust-on-first-use', function() { knownHosts: knownHostsPath }); - driver.session(); // write into the knownHost file + // create session and transaction to force creation of new connection and writing into the knownHost file + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); // duplicate the same serverId twice setTimeout(function() { diff --git a/test/resources/boltkit/acquire_endpoints_with_one_of_each.script b/test/resources/boltkit/acquire_endpoints_with_one_of_each.script new file mode 100644 index 000000000..2d6cda748 --- /dev/null +++ b/test/resources/boltkit/acquire_endpoints_with_one_of_each.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/resources/boltkit/read_tx_with_bookmarks.script b/test/resources/boltkit/read_tx_with_bookmarks.script new file mode 100644 index 000000000..a48ee65f9 --- /dev/null +++ b/test/resources/boltkit/read_tx_with_bookmarks.script @@ -0,0 +1,18 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "BEGIN" {"bookmark": "OldBookmark"} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Bob"] + RECORD ["Alice"] + SUCCESS {"bookmark": "NewBookmark"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} diff --git a/test/resources/boltkit/write_read_tx_with_bookmark_override.script.mst b/test/resources/boltkit/write_read_tx_with_bookmark_override.script.mst new file mode 100644 index 000000000..6ef5beff4 --- /dev/null +++ b/test/resources/boltkit/write_read_tx_with_bookmark_override.script.mst @@ -0,0 +1,30 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "BEGIN" {"bookmark": "BookmarkA"} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {"bookmark": "BookmarkB"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "BEGIN" {{{bookmarkOverride}}} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Bob"] + SUCCESS {"bookmark": "BookmarkC"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} + diff --git a/test/resources/boltkit/write_read_tx_with_bookmarks.script b/test/resources/boltkit/write_read_tx_with_bookmarks.script new file mode 100644 index 000000000..e8207f887 --- /dev/null +++ b/test/resources/boltkit/write_read_tx_with_bookmarks.script @@ -0,0 +1,30 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "BEGIN" {"bookmark": "BookmarkA"} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {"bookmark": "BookmarkB"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "BEGIN" {"bookmark": "BookmarkB"} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Bob"] + SUCCESS {"bookmark": "BookmarkC"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} + diff --git a/test/resources/boltkit/write_tx_with_bookmarks.script b/test/resources/boltkit/write_tx_with_bookmarks.script new file mode 100644 index 000000000..08247a9e2 --- /dev/null +++ b/test/resources/boltkit/write_tx_with_bookmarks.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "BEGIN" {"bookmark": "OldBookmark"} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {"bookmark": "NewBookmark"} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} diff --git a/test/v1/direct.driver.boltkit.it.js b/test/v1/direct.driver.boltkit.it.js index 90355988b..8fb1c4d7d 100644 --- a/test/v1/direct.driver.boltkit.it.js +++ b/test/v1/direct.driver.boltkit.it.js @@ -17,39 +17,208 @@ * limitations under the License. */ -var neo4j = require("../../lib/v1").default; -var boltkit = require('./boltkit'); +import neo4j from '../../lib/v1'; +import {READ, WRITE} from '../../lib/v1/driver'; +import boltkit from './boltkit'; -describe('direct driver', function() { +describe('direct driver', () => { - it('should run query', function (done) { + it('should run query', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/return_x.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/return_x.script', 9001); - kit.run(function () { - // BoltKit currently does not support encryption, create driver with encryption turned off - var driver = neo4j.driver("bolt://localhost:9001", neo4j.auth.basic("neo4j", "neo4j"), { - encrypted: "ENCRYPTION_OFF" - }); - // When - var session = driver.session(); - // Then - session.run("RETURN {x}", {'x': 1}).then(function (res) { + kit.run(() => { + const driver = createDriver(); + // When + const session = driver.session(); + // Then + session.run('RETURN {x}', {'x': 1}).then(res => { expect(res.records[0].get('x').toInt()).toEqual(1); session.close(); driver.close(); - server.exit(function(code) { - expect(code).toEqual(0); - done(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + + it('should send and receive bookmark for read transaction', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/read_tx_with_bookmarks.script', 9001); + + kit.run(() => { + const driver = createDriver(); + const session = driver.session(READ, 'OldBookmark'); + const tx = session.beginTransaction(); + tx.run('MATCH (n) RETURN n.name AS name').then(result => { + const records = result.records; + expect(records.length).toEqual(2); + expect(records[0].get('name')).toEqual('Bob'); + expect(records[1].get('name')).toEqual('Alice'); + + tx.commit().then(() => { + expect(session.lastBookmark()).toEqual('NewBookmark'); + + session.close(() => { + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + + it('should send and receive bookmark for write transaction', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/write_tx_with_bookmarks.script', 9001); + + kit.run(() => { + const driver = createDriver(); + const session = driver.session(WRITE, 'OldBookmark'); + const tx = session.beginTransaction(); + tx.run('CREATE (n {name:\'Bob\'})').then(result => { + const records = result.records; + expect(records.length).toEqual(0); + + tx.commit().then(() => { + expect(session.lastBookmark()).toEqual('NewBookmark'); + + session.close(() => { + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + + it('should send and receive bookmark between write and read transactions', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/write_read_tx_with_bookmarks.script', 9001); + + kit.run(() => { + const driver = createDriver(); + const session = driver.session(WRITE, 'BookmarkA'); + const writeTx = session.beginTransaction(); + writeTx.run('CREATE (n {name:\'Bob\'})').then(result => { + const records = result.records; + expect(records.length).toEqual(0); + + writeTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkB'); + + const readTx = session.beginTransaction(); + readTx.run('MATCH (n) RETURN n.name AS name').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get('name')).toEqual('Bob'); + + readTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkC'); + + session.close(() => { + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); }); }); + }); }); }); + + it('should be possible to override bookmark', done => { + testBookmarkOverride('BookmarkOverride', done); + }); + + it('should be possible to override bookmark with null', done => { + testBookmarkOverride(null, done); + }); + + function testBookmarkOverride(bookmarkOverride, done) { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const bookmarkScriptValue = bookmarkOverride ? JSON.stringify({bookmark: bookmarkOverride}) : '{}'; + const params = {bookmarkOverride: bookmarkScriptValue}; + const server = kit.startWithTemplate( + './test/resources/boltkit/write_read_tx_with_bookmark_override.script.mst', params, 9001); + + kit.run(() => { + const driver = createDriver(); + const session = driver.session(WRITE, 'BookmarkA'); + const writeTx = session.beginTransaction(); + writeTx.run('CREATE (n {name:\'Bob\'})').then(result => { + const records = result.records; + expect(records.length).toEqual(0); + + writeTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkB'); + + const readTx = session.beginTransaction(bookmarkOverride); + readTx.run('MATCH (n) RETURN n.name AS name').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get('name')).toEqual('Bob'); + + readTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkC'); + + session.close(() => { + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + } + }); +function createDriver() { + // BoltKit currently does not support encryption, create driver with encryption turned off + const config = { + encrypted: 'ENCRYPTION_OFF' + }; + return neo4j.driver('bolt://localhost:9001', neo4j.auth.basic('neo4j', 'neo4j'), config); +} diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 906750da5..c483bf4fa 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -53,7 +53,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should handle wrong scheme', () => { @@ -84,7 +84,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should indicate success early on correct credentials', function(done) { @@ -97,7 +97,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to pass a realm with basic auth tokens', function(done) { @@ -110,7 +110,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens', function(done) { @@ -123,7 +123,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should be possible to create custom auth tokens with additional parameters', function(done) { @@ -136,7 +136,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should fail nicely when connecting with routing to standalone server', done => { @@ -151,7 +151,7 @@ describe('driver', function() { }; // When - driver.session(); + startNewTransaction(driver); }); it('should have correct user agent', () => { @@ -192,4 +192,13 @@ describe('driver', function() { }); }); + /** + * Starts new transaction to force new network connection. + * @param {Driver} driver - the driver to use. + */ + function startNewTransaction(driver) { + const session = driver.session(); + expect(session.beginTransaction()).toBeDefined(); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 457f48d5a..75f4721e9 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -18,6 +18,7 @@ */ import neo4j from '../../src/v1'; +import {READ, WRITE} from '../../src/v1/driver'; import boltkit from './boltkit'; import RoutingTable from '../../src/v1/internal/routing-table'; @@ -939,11 +940,10 @@ describe('routing driver', () => { for (let i = 0; i < acquiredConnections.length; i++) { expect(acquiredConnections[i]).toBe(releasedConnections[i]); } - done(); }); }); - }); + }).catch(console.log); }); }); @@ -1055,6 +1055,166 @@ describe('routing driver', () => { 9999, done); }); + it('should send and receive bookmark', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const writer = kit.start('./test/resources/boltkit/write_tx_with_bookmarks.script', 9007); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const session = driver.session(); + const tx = session.beginTransaction('OldBookmark'); + tx.run('CREATE (n {name:\'Bob\'})').then(() => { + tx.commit().then(() => { + expect(session.lastBookmark()).toEqual('NewBookmark'); + + session.close(); + driver.close(); + + router.exit(code1 => { + writer.exit(code2 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + + it('should send initial bookmark wihtout access mode', done => { + testWriteSessionWithAccessModeAndBookmark(null, 'OldBookmark', done); + }); + + it('should use write session mode and initial bookmark', done => { + testWriteSessionWithAccessModeAndBookmark(WRITE, 'OldBookmark', done); + }); + + it('should use read session mode and initial bookmark', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const writer = kit.start('./test/resources/boltkit/read_tx_with_bookmarks.script', 9005); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const session = driver.session(READ, 'OldBookmark'); + const tx = session.beginTransaction(); + tx.run('MATCH (n) RETURN n.name AS name').then(result => { + const records = result.records; + expect(records.length).toEqual(2); + expect(records[0].get('name')).toEqual('Bob'); + expect(records[1].get('name')).toEqual('Alice'); + + tx.commit().then(() => { + expect(session.lastBookmark()).toEqual('NewBookmark'); + + session.close(); + driver.close(); + + router.exit(code1 => { + writer.exit(code2 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + + it('should pass bookmark from transaction to transaction', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints_with_one_of_each.script', 9001); + const writer = kit.start('./test/resources/boltkit/write_read_tx_with_bookmarks.script', 9007); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const session = driver.session(null, 'BookmarkA'); + const writeTx = session.beginTransaction(); + writeTx.run('CREATE (n {name:\'Bob\'})').then(() => { + writeTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkB'); + + const readTx = session.beginTransaction(); + readTx.run('MATCH (n) RETURN n.name AS name').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get('name')).toEqual('Bob'); + + readTx.commit().then(() => { + expect(session.lastBookmark()).toEqual('BookmarkC'); + + session.close(); + driver.close(); + + router.exit(code1 => { + writer.exit(code2 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + done(); + }); + }); + }); + }); + }); + }); + }); + }); + + function testWriteSessionWithAccessModeAndBookmark(accessMode, bookmark, done) { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const writer = kit.start('./test/resources/boltkit/write_tx_with_bookmarks.script', 9007); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const session = driver.session(accessMode, bookmark); + const tx = session.beginTransaction(); + tx.run('CREATE (n {name:\'Bob\'})').then(() => { + tx.commit().then(() => { + expect(session.lastBookmark()).toEqual('NewBookmark'); + + session.close(); + driver.close(); + + router.exit(code1 => { + writer.exit(code2 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + done(); + }); + }); + }); + }); + }); + } + function testForProtocolError(scriptFile, done) { if (!boltkit.BoltKitSupport) { done(); @@ -1170,10 +1330,6 @@ describe('routing driver', () => { return driver._connectionProvider._connectionPool; } - function setConnectionPool(driver, newConnectionPool) { - driver._connectionProvider._connectionPool = newConnectionPool; - } - function getRoutingTable(driver) { return driver._connectionProvider._routingTable; } diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 30fbcbf67..22402e36a 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -20,6 +20,9 @@ import neo4j from '../../src/v1'; import {statementType} from '../../src/v1/result-summary'; import Session from '../../src/v1/session'; +import {READ} from '../../src/v1/driver'; +import {SingleConnectionProvider} from '../../src/v1/internal/connection-providers'; +import FakeConnection from '../internal/fake-connection'; describe('session', () => { @@ -47,14 +50,14 @@ describe('session', () => { it('close should invoke callback ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(done); }); it('close should invoke callback even when already closed ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { session.close(() => { @@ -67,16 +70,16 @@ describe('session', () => { it('close should be idempotent ', done => { const connection = new FakeConnection(); - const session = new Session(Promise.resolve(connection)); + const session = newSessionWithConnection(connection); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); session.close(() => { - expect(connection.closedOnce()).toBeTruthy(); + expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); done(); }); }); @@ -370,20 +373,25 @@ describe('session', () => { it('should be able to close a long running query ', done => { //given a long running query - session.run("unwind range(1,1000000) as x create (n {prop:x}) delete n"); + session.run('unwind range(1,1000000) as x create (n {prop:x}) delete n').catch(error => { + // long running query should fail + expect(error).toBeDefined(); - //wait some time than close the session and run - //a new query + // and it should be possible to start another session and run a query + const anotherSession = driver.session(); + anotherSession.run('RETURN 1.0 as a').then(result => { + expect(result.records.length).toBe(1); + expect(result.records[0].get('a')).toBe(1); + done(); + }).catch(error => { + console.log('Query failed after a long running query was terminated', error); + }); + }); + + // wait some time than close the session with a long running query setTimeout(() => { session.close(); - const anotherSession = driver.session(); - setTimeout(() => { - anotherSession.run("RETURN 1.0 as a") - .then(ignore => { - done(); - }); - }, 500); - }, 500); + }, 1000); }); it('should fail nicely on unpackable values ', done => { @@ -441,30 +449,191 @@ describe('session', () => { expect(() => driver.session('ILLEGAL_MODE')).toThrow(); }); - class FakeConnection { + it('should release connection to the pool after run', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - constructor() { - this.resetInvoked = 0; - this.syncInvoked = 0; - this.releaseInvoked = 0; - } + session.run('RETURN 1').then(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - reset() { - this.resetInvoked++; - } + it('should release connection to the pool after run failure', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); - sync() { - this.syncInvoked++; - } + session.run('RETURN 10 / 0').catch(() => { + const idleConnectionsAfter = idleConnectionCount(driver); + expect(idleConnectionsBefore).toEqual(idleConnectionsAfter); + done(); + }); + }); + }); - _release() { - this.releaseInvoked++; - } + it('should release connection to the pool when result is consumed', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + } + }); + }); + }); - closedOnce() { - return this.resetInvoked === 1 && this.syncInvoked === 1 && this.releaseInvoked === 1; - } + it('should release connection to the pool when result fails', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + session.run('UNWIND range(10, 0, -1) AS x RETURN 10 / x').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current query + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: ignoredError => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }, + onCompleted: () => { + } + }); + }); + }); + + it('should release connection to the pool when transaction commits', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.commit().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); + + it('should release connection to the pool when transaction rolls back', done => { + withQueryInTmpSession(driver, () => { + const idleConnectionsBefore = idleConnectionCount(driver); + + const tx = session.beginTransaction(); + tx.run('UNWIND range(0, 10) AS x RETURN x + 1').subscribe({ + onNext: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + }, + onError: error => { + console.log(error); + }, + onCompleted: () => { + // one less idle connection, one connection is used for the current transaction + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore - 1); + tx.rollback().then(() => { + expect(idleConnectionCount(driver)).toBe(idleConnectionsBefore); + done(); + }); + } + }); + }); + }); + + it('should update last bookmark after every read tx commit', done => { + const bookmarkBefore = session.lastBookmark(); + + const tx = session.beginTransaction(); + tx.run('RETURN 42 as answer').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get('answer').toNumber()).toEqual(42); + + tx.commit().then(() => { + const bookmarkAfter = session.lastBookmark(); + expect(bookmarkAfter).toBeDefined(); + expect(bookmarkAfter).not.toBeNull(); + expect(bookmarkAfter).not.toEqual(bookmarkBefore); + + done(); + }); + }); + }); + + it('should update last bookmark after every write tx commit', done => { + const bookmarkBefore = session.lastBookmark(); + + const tx = session.beginTransaction(); + tx.run('CREATE ()').then(() => { + tx.commit().then(() => { + const bookmarkAfter = session.lastBookmark(); + expect(bookmarkAfter).toBeDefined(); + expect(bookmarkAfter).not.toBeNull(); + expect(bookmarkAfter).not.toEqual(bookmarkBefore); + + done(); + }); + }); + }); + + it('should not lose last bookmark after run', done => { + const tx = session.beginTransaction(); + tx.run('CREATE ()').then(() => { + tx.commit().then(() => { + const bookmarkBefore = session.lastBookmark(); + expect(bookmarkBefore).toBeDefined(); + expect(bookmarkBefore).not.toBeNull(); + + session.run('CREATE ()').then(() => { + const bookmarkAfter = session.lastBookmark(); + expect(bookmarkAfter).toEqual(bookmarkBefore); + done(); + }); + }); + }); + }); + + function withQueryInTmpSession(driver, callback) { + const tmpSession = driver.session(); + return tmpSession.run('RETURN 1').then(() => { + tmpSession.close(callback); + }); } -}); + function newSessionWithConnection(connection) { + const connectionProvider = new SingleConnectionProvider(Promise.resolve(connection)); + const session = new Session(READ, connectionProvider); + session.beginTransaction(); // force session to acquire new connection + return session; + } + function idleConnectionCount(driver) { + const connectionProvider = driver._connectionProvider; + const address = connectionProvider._address; + const connectionPool = connectionProvider._connectionPool; + const idleConnections = connectionPool._pools[address]; + return idleConnections.length; + } +}); diff --git a/test/v1/tck/steps/erroreportingsteps.js b/test/v1/tck/steps/erroreportingsteps.js index 2c75dfe9d..54d2306fa 100644 --- a/test/v1/tck/steps/erroreportingsteps.js +++ b/test/v1/tck/steps/erroreportingsteps.js @@ -65,7 +65,7 @@ module.exports = function () { var self = this; var driver = neo4j.driver("bolt://localhost:7777", neo4j.auth.basic("neo4j", "neo4j")); driver.onError = function (error) { self.error = error; callback()}; - driver.session(); + driver.session().beginTransaction(); setTimeout(callback, 1000); }); diff --git a/test/v1/transaction.test.js b/test/v1/transaction.test.js index c40f94dfc..e742d9753 100644 --- a/test/v1/transaction.test.js +++ b/test/v1/transaction.test.js @@ -229,7 +229,7 @@ describe('transaction', () => { }).catch(console.log); }); - it('should have no bookmark when tx is rolled back', done => { + it('should have bookmark when tx is rolled back', done => { if (neo4jVersionOlderThan31(done)) { return; } @@ -240,11 +240,14 @@ describe('transaction', () => { tx1.run('CREATE ()').then(() => { tx1.commit().then(() => { expectValidLastBookmark(session); + const bookmarkBefore = session.lastBookmark(); const tx2 = session.beginTransaction(); tx2.run('CREATE ()').then(() => { tx2.rollback().then(() => { - expect(session.lastBookmark()).not.toBeDefined(); + expectValidLastBookmark(session); + const bookmarkAfter = session.lastBookmark(); + expect(bookmarkAfter).toEqual(bookmarkBefore); const tx3 = session.beginTransaction(); tx3.run('CREATE ()').then(() => { @@ -270,12 +273,14 @@ describe('transaction', () => { tx1.run('CREATE ()').then(() => { tx1.commit().then(() => { expectValidLastBookmark(session); + const bookmarkBefore = session.lastBookmark(); const tx2 = session.beginTransaction(); tx2.run('RETURN').catch(error => { expectSyntaxError(error); - expect(session.lastBookmark()).not.toBeDefined(); + const bookmarkAfter = session.lastBookmark(); + expect(bookmarkAfter).toEqual(bookmarkBefore); const tx3 = session.beginTransaction(); tx3.run('CREATE ()').then(() => {