diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index c70956e3f8..b6bd0325a3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -47,6 +47,7 @@ import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.net.ServerAddressResolver; +import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedFuture; public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler @@ -221,7 +222,8 @@ private void acquire( AccessMode mode, AddressSet addresses, CompletableFuture acquire( mode, addresses, result ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index 0a6a60cbea..d3aadad301 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -39,6 +39,8 @@ import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -61,7 +63,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.v1.Logging.none; @@ -726,6 +730,40 @@ void shouldRetryWriteTransactionUntilSuccess() throws Exception } } + @Test + void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exception + { + // This test simulates a router in a cluster when a leader is removed. + // The router first returns a RT with a writer inside. + // However this writer is killed while the driver is running a tx with it. + // Then at the second time the router returns the same RT with the killed writer inside. + // At the third round, the router removes the the writer server from RT reply. + // Finally, the router returns a RT with a reachable writer. + StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 ); + StubServer brokenWriter = StubServer.start( "dead_write_server.script", 9004 ); + StubServer writer = StubServer.start( "write_server.script", 9008 ); + + Logger logger = mock( Logger.class ); + Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build(); + try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config ); + Session session = driver.session() ) + { + AtomicInteger invocations = new AtomicInteger(); + List records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ); + + assertEquals( 0, records.size() ); + assertEquals( 2, invocations.get() ); + } + finally + { + assertEquals( 0, router.exitStatus() ); + assertEquals( 0, brokenWriter.exitStatus() ); + assertEquals( 0, writer.exitStatus() ); + } + verify( logger, times( 3 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) ); + verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) ); + } + @Test void shouldRetryReadTransactionUntilFailure() throws Exception { @@ -1159,19 +1197,24 @@ void useSessionAfterDriverIsClosed() throws Exception } } - private static Driver newDriverWithSleeplessClock( String uriString ) + private static Driver newDriverWithSleeplessClock( String uriString, Config config ) { DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() ); - return newDriver( uriString, driverFactory ); + return newDriver( uriString, driverFactory, config ); + } + + private static Driver newDriverWithSleeplessClock( String uriString ) + { + return newDriverWithSleeplessClock( uriString, config ); } private static Driver newDriverWithFixedRetries( String uriString, int retries ) { DriverFactory driverFactory = new DriverFactoryWithFixedRetryLogic( retries ); - return newDriver( uriString, driverFactory ); + return newDriver( uriString, driverFactory, config ); } - private static Driver newDriver( String uriString, DriverFactory driverFactory ) + private static Driver newDriver( String uriString, DriverFactory driverFactory, Config config ) { URI uri = URI.create( uriString ); RoutingSettings routingConf = new RoutingSettings( 1, 1, null ); @@ -1201,4 +1244,11 @@ private static List readStrings( final String query, Session session ) return names; } ); } + + private static Logging mockedLogging( Logger logger ) + { + Logging logging = mock( Logging.class ); + when( logging.getLog( any() ) ).thenReturn( logger ); + return logging; + } } diff --git a/driver/src/test/resources/acquire_endpoints_v3_leader_killed.script b/driver/src/test/resources/acquire_endpoints_v3_leader_killed.script new file mode 100644 index 0000000000..6ab2ff89b9 --- /dev/null +++ b/driver/src/test/resources/acquire_endpoints_v3_leader_killed.script @@ -0,0 +1,25 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] + SUCCESS {}