@@ -23,7 +23,8 @@ import {
2323 error ,
2424 Integer ,
2525 int ,
26- internal
26+ internal ,
27+ ServerInfo
2728} from 'neo4j-driver-core'
2829import { RoutingTable } from '../../src/rediscovery/'
2930import { Pool } from '../../src/pool'
@@ -2474,6 +2475,234 @@ describe('#unit RoutingConnectionProvider', () => {
24742475 } )
24752476
24762477 } )
2478+
2479+ describe . each ( [
2480+ [ undefined , READ ] ,
2481+ [ undefined , WRITE ] ,
2482+ [ '' , READ ] ,
2483+ [ '' , WRITE ] ,
2484+ [ 'databaseA' , READ ] ,
2485+ [ 'databaseA' , WRITE ] ,
2486+ ] ) ( '.verifyConnectivityAndGetServeInfo({ database: %s, accessMode: %s })' , ( database , accessMode ) => {
2487+ describe ( 'when connection is available in the pool' , ( ) => {
2488+ it ( 'should return the server info' , async ( ) => {
2489+ const { connectionProvider, server, protocolVersion } = setup ( )
2490+
2491+ const serverInfo = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2492+
2493+ expect ( serverInfo ) . toEqual ( new ServerInfo ( server , protocolVersion ) )
2494+ } )
2495+
2496+ it ( 'should acquire, resetAndFlush and release connections for sever with the selected access mode' , async ( ) => {
2497+ const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup ( )
2498+ const acquireSpy = jest . spyOn ( pool , 'acquire' )
2499+
2500+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2501+
2502+ const targetServers = accessMode === WRITE ? routingTable . writers : routingTable . readers
2503+ for ( const address of targetServers ) {
2504+ expect ( acquireSpy ) . toHaveBeenCalledWith ( address )
2505+
2506+ const connections = seenConnectionsPerAddress . get ( address )
2507+
2508+ expect ( connections . length ) . toBe ( 1 )
2509+ expect ( connections [ 0 ] . resetAndFlush ) . toHaveBeenCalled ( )
2510+ expect ( connections [ 0 ] . _release ) . toHaveBeenCalled ( )
2511+ expect ( connections [ 0 ] . _release . mock . invocationCallOrder [ 0 ] )
2512+ . toBeGreaterThan ( connections [ 0 ] . resetAndFlush . mock . invocationCallOrder [ 0 ] )
2513+ }
2514+ } )
2515+
2516+ it ( 'should not acquire, resetAndFlush and release connections for sever with the other access mode' , async ( ) => {
2517+ const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup ( )
2518+ const acquireSpy = jest . spyOn ( pool , 'acquire' )
2519+
2520+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2521+
2522+ const targetServers = accessMode === WRITE ? routingTable . readers : routingTable . writers
2523+ for ( const address of targetServers ) {
2524+ expect ( acquireSpy ) . not . toHaveBeenCalledWith ( address )
2525+ expect ( seenConnectionsPerAddress . get ( address ) ) . toBeUndefined ( )
2526+ }
2527+ } )
2528+
2529+ describe ( 'when the reset and flush fails for at least one the address' , ( ) => {
2530+ it ( 'should fails with the reset and flush error' , async ( ) => {
2531+ const error = newError ( 'Error' )
2532+ let i = 0
2533+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2534+ const { connectionProvider } = setup ( { resetAndFlush } )
2535+
2536+ try {
2537+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2538+ expect ( ) . toBe ( 'Not reached' )
2539+ } catch ( e ) {
2540+ expect ( e ) . toBe ( error )
2541+ }
2542+ } )
2543+
2544+ it ( 'should release the connection' , async ( ) => {
2545+ const error = newError ( 'Error' )
2546+ let i = 0
2547+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2548+ const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup ( { resetAndFlush } )
2549+
2550+ try {
2551+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2552+ } catch ( e ) {
2553+ } finally {
2554+ const targetServers = accessMode === WRITE ? routingTable . writers : routingTable . readers
2555+ for ( const address of targetServers ) {
2556+ const connections = seenConnectionsPerAddress . get ( address )
2557+
2558+ expect ( connections . length ) . toBe ( 1 )
2559+ expect ( connections [ 0 ] . resetAndFlush ) . toHaveBeenCalled ( )
2560+ expect ( connections [ 0 ] . _release ) . toHaveBeenCalled ( )
2561+ }
2562+ }
2563+ } )
2564+
2565+ describe ( 'and the release fails' , ( ) => {
2566+ it ( 'should fails with the release error' , async ( ) => {
2567+ const error = newError ( 'Error' )
2568+ const releaseError = newError ( 'Release error' )
2569+ let i = 0
2570+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2571+ const releaseMock = jest . fn ( ( ) => Promise . reject ( releaseError ) )
2572+ const { connectionProvider } = setup ( { resetAndFlush, releaseMock } )
2573+
2574+ try {
2575+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2576+ expect ( ) . toBe ( 'Not reached' )
2577+ } catch ( e ) {
2578+ expect ( e ) . toBe ( releaseError )
2579+ }
2580+ } )
2581+ } )
2582+
2583+ } )
2584+
2585+ describe ( 'when the release for at least one the address' , ( ) => {
2586+ it ( 'should fails with the reset and flush error' , async ( ) => {
2587+ const error = newError ( 'Error' )
2588+ let i = 0
2589+ const releaseMock = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2590+ const { connectionProvider } = setup ( { releaseMock } )
2591+
2592+ try {
2593+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2594+ expect ( ) . toBe ( 'Not reached' )
2595+ } catch ( e ) {
2596+ expect ( e ) . toBe ( error )
2597+ }
2598+ } )
2599+ } )
2600+
2601+ function setup ( { resetAndFlush, releaseMock } = { } ) {
2602+ const routingTable = newRoutingTable (
2603+ database || null ,
2604+ [ server1 , server2 ] ,
2605+ [ server3 , server4 ] ,
2606+ [ server5 , server6 ]
2607+ )
2608+ const protocolVersion = 4.4
2609+ const server = { address : 'localhost:123' , version : 'neo4j/1234' }
2610+
2611+ const seenConnectionsPerAddress = new Map ( )
2612+
2613+ const pool = newPool ( {
2614+ create : ( address , release ) => {
2615+ if ( ! seenConnectionsPerAddress . has ( address ) ) {
2616+ seenConnectionsPerAddress . set ( address , [ ] )
2617+ }
2618+ const connection = new FakeConnection ( address , release , 'version' , protocolVersion , server )
2619+ if ( resetAndFlush ) {
2620+ connection . resetAndFlush = resetAndFlush
2621+ }
2622+ if ( releaseMock ) {
2623+ connection . _release = releaseMock
2624+ }
2625+ seenConnectionsPerAddress . get ( address ) . push ( connection )
2626+ return connection
2627+ }
2628+ } )
2629+ const connectionProvider = newRoutingConnectionProvider (
2630+ [
2631+ routingTable
2632+ ] ,
2633+ pool
2634+ )
2635+ return { connectionProvider, routingTable, seenConnectionsPerAddress, server, protocolVersion, pool }
2636+ }
2637+ } )
2638+
2639+ describe ( 'when at least the one of the servers is not available' , ( ) => {
2640+ it ( 'should reject with acquistion timeout error' , async ( ) => {
2641+ const routingTable = newRoutingTable (
2642+ database || null ,
2643+ [ server1 , server2 ] ,
2644+ [ server3 , server4 ] ,
2645+ [ server5 , server6 ]
2646+ )
2647+
2648+ const pool = newPool ( {
2649+ config : {
2650+ acquisitionTimeout : 0 ,
2651+ }
2652+ } )
2653+
2654+ const connectionProvider = newRoutingConnectionProvider (
2655+ [
2656+ routingTable
2657+ ] ,
2658+ pool
2659+ )
2660+
2661+ try {
2662+ connectionProvider = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2663+ expect ( ) . toBe ( 'not reached' )
2664+ } catch ( e ) {
2665+ expect ( e ) . toBeDefined ( )
2666+ }
2667+ } )
2668+ } )
2669+
2670+ describe ( 'when at least the one of the connections could not be created' , ( ) => {
2671+ it ( 'should reject with acquistion timeout error' , async ( ) => {
2672+ let i = 0
2673+ const error = new Error ( 'Connection creation error' )
2674+ const routingTable = newRoutingTable (
2675+ database || null ,
2676+ [ server1 , server2 ] ,
2677+ [ server3 , server4 ] ,
2678+ [ server5 , server6 ]
2679+ )
2680+
2681+ const pool = newPool ( {
2682+ create : ( address , release ) => {
2683+ if ( i ++ % 2 === 0 ) {
2684+ return new FakeConnection ( address , release , 'version' , 4.4 , { } )
2685+ }
2686+ throw error
2687+ }
2688+ } )
2689+
2690+ const connectionProvider = newRoutingConnectionProvider (
2691+ [
2692+ routingTable
2693+ ] ,
2694+ pool
2695+ )
2696+
2697+ try {
2698+ connectionProvider = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2699+ expect ( ) . toBe ( 'not reached' )
2700+ } catch ( e ) {
2701+ expect ( e ) . toBe ( error )
2702+ }
2703+ } )
2704+ } )
2705+ } )
24772706} )
24782707
24792708function newRoutingConnectionProvider (
@@ -2567,10 +2796,20 @@ function setupRoutingConnectionProviderToRememberRouters (
25672796 connectionProvider . _fetchRoutingTable = rememberingFetch
25682797}
25692798
2570- function newPool ( ) {
2799+ function newPool ( { create, config } = { } ) {
2800+ const _create = ( address , release ) => {
2801+ if ( create ) {
2802+ try {
2803+ return Promise . resolve ( create ( address , release ) )
2804+ } catch ( e ) {
2805+ return Promise . reject ( e )
2806+ }
2807+ }
2808+ return Promise . resolve ( new FakeConnection ( address , release , 'version' , 4.0 ) )
2809+ }
25712810 return new Pool ( {
2572- create : ( address , release ) =>
2573- Promise . resolve ( new FakeConnection ( address , release , 'version' , 4.0 ) )
2811+ config ,
2812+ create : ( address , release ) => _create ( address , release ) ,
25742813 } )
25752814}
25762815
@@ -2605,13 +2844,16 @@ function expectPoolToNotContain (pool, addresses) {
26052844}
26062845
26072846class FakeConnection extends Connection {
2608- constructor ( address , release , version , protocolVersion ) {
2847+ constructor ( address , release , version , protocolVersion , server ) {
26092848 super ( null )
26102849
26112850 this . _address = address
26122851 this . _version = version || VERSION_IN_DEV . toString ( )
26132852 this . _protocolVersion = protocolVersion
26142853 this . release = release
2854+ this . _release = jest . fn ( ( ) => release ( address , this ) )
2855+ this . resetAndFlush = jest . fn ( ( ) => Promise . resolve ( ) )
2856+ this . _server = server
26152857 }
26162858
26172859 get address ( ) {
@@ -2622,6 +2864,10 @@ class FakeConnection extends Connection {
26222864 return this . _version
26232865 }
26242866
2867+ get server ( ) {
2868+ return this . _server
2869+ }
2870+
26252871 protocol ( ) {
26262872 return {
26272873 version : this . _protocolVersion
0 commit comments