diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index e6319ba69d..3e17185494 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage; +import org.neo4j.driver.internal.async.AccessModeConnection; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -45,7 +46,7 @@ public class DirectConnectionProvider implements ConnectionProvider @Override public CompletionStage acquireConnection( AccessMode mode ) { - return connectionPool.acquire( address ); + return connectionPool.acquire( address ).thenApply( connection -> new AccessModeConnection( connection, mode ) ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AccessModeConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/AccessModeConnection.java new file mode 100644 index 0000000000..aa7706aa46 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/AccessModeConnection.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.messaging.BoltProtocol; +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.AccessMode; + +public class AccessModeConnection implements Connection +{ + private final Connection delegate; + private final AccessMode mode; + + public AccessModeConnection( Connection delegate, AccessMode mode ) + { + this.delegate = delegate; + this.mode = mode; + } + + public Connection connection() + { + return delegate; + } + + @Override + public boolean isOpen() + { + return delegate.isOpen(); + } + + @Override + public void enableAutoRead() + { + delegate.enableAutoRead(); + } + + @Override + public void disableAutoRead() + { + delegate.disableAutoRead(); + } + + @Override + public void write( Message message, ResponseHandler handler ) + { + delegate.write( message, handler ); + } + + @Override + public void write( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 ) + { + delegate.write( message1, handler1, message2, handler2 ); + } + + @Override + public void writeAndFlush( Message message, ResponseHandler handler ) + { + delegate.writeAndFlush( message, handler ); + } + + @Override + public void writeAndFlush( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 ) + { + delegate.writeAndFlush( message1, handler1, message2, handler2 ); + } + + @Override + public CompletionStage reset() + { + return delegate.reset(); + } + + @Override + public CompletionStage release() + { + return delegate.release(); + } + + @Override + public void terminateAndRelease( String reason ) + { + delegate.terminateAndRelease( reason ); + } + + @Override + public BoltServerAddress serverAddress() + { + return delegate.serverAddress(); + } + + @Override + public ServerVersion serverVersion() + { + return delegate.serverVersion(); + } + + @Override + public BoltProtocol protocol() + { + return delegate.protocol(); + } + + @Override + public AccessMode mode() + { + return mode; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 40253a069f..c70956e3f8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -25,6 +25,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.RoutingErrorHandler; +import org.neo4j.driver.internal.async.AccessModeConnection; import org.neo4j.driver.internal.async.RoutingConnection; import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterComposition; @@ -95,7 +96,8 @@ public CompletionStage acquireConnection( AccessMode mode ) { return freshRoutingTable( mode ) .thenCompose( routingTable -> acquire( mode, routingTable ) ) - .thenApply( connection -> new RoutingConnection( connection, mode, this ) ); + .thenApply( connection -> new RoutingConnection( connection, mode, this ) ) + .thenApply( connection -> new AccessModeConnection( connection, mode ) ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java index cccc334873..feb372520d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.neo4j.driver.internal.Bookmarks; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.TransactionConfig; import org.neo4j.driver.v1.Value; @@ -30,14 +31,14 @@ public class BeginMessage extends TransactionStartingMessage { public static final byte SIGNATURE = 0x11; - public BeginMessage( Bookmarks bookmarks, TransactionConfig config ) + public BeginMessage( Bookmarks bookmarks, TransactionConfig config, AccessMode mode ) { - this( bookmarks, config.timeout(), config.metadata() ); + this( bookmarks, config.timeout(), config.metadata(), mode ); } - public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map txMetadata ) + public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map txMetadata, AccessMode mode ) { - super( bookmarks, txTimeout, txMetadata ); + super( bookmarks, txTimeout, txMetadata, mode ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java index f681293bcc..1ccccf2fc7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.neo4j.driver.internal.Bookmarks; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.TransactionConfig; import org.neo4j.driver.v1.Value; @@ -33,14 +34,15 @@ public class RunWithMetadataMessage extends TransactionStartingMessage private final String statement; private final Map parameters; - public RunWithMetadataMessage( String statement, Map parameters, Bookmarks bookmarks, TransactionConfig config ) + public RunWithMetadataMessage( String statement, Map parameters, Bookmarks bookmarks, TransactionConfig config, AccessMode mode ) { - this( statement, parameters, bookmarks, config.timeout(), config.metadata() ); + this( statement, parameters, bookmarks, config.timeout(), config.metadata(), mode ); } - public RunWithMetadataMessage( String statement, Map parameters, Bookmarks bookmarks, Duration txTimeout, Map txMetadata ) + public RunWithMetadataMessage( String statement, Map parameters, Bookmarks bookmarks, Duration txTimeout, Map txMetadata, + AccessMode mode ) { - super( bookmarks, txTimeout, txMetadata ); + super( bookmarks, txTimeout, txMetadata, mode ); this.statement = statement; this.parameters = parameters; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionStartingMessage.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionStartingMessage.java index dcce161b0e..27542391c3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionStartingMessage.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionStartingMessage.java @@ -24,6 +24,7 @@ import org.neo4j.driver.internal.Bookmarks; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.util.Iterables; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Value; import static java.util.Collections.emptyMap; @@ -34,12 +35,14 @@ abstract class TransactionStartingMessage implements Message private static final String BOOKMARKS_METADATA_KEY = "bookmarks"; private static final String TX_TIMEOUT_METADATA_KEY = "tx_timeout"; private static final String TX_METADATA_METADATA_KEY = "tx_metadata"; + private static final String MODE_KEY = "mode"; + private static final String MODE_READ_VALUE = "r"; final Map metadata; - TransactionStartingMessage( Bookmarks bookmarks, Duration txTimeout, Map txMetadata ) + TransactionStartingMessage( Bookmarks bookmarks, Duration txTimeout, Map txMetadata, AccessMode mode ) { - this.metadata = buildMetadata( bookmarks, txTimeout, txMetadata ); + this.metadata = buildMetadata( bookmarks, txTimeout, txMetadata, mode ); } public final Map metadata() @@ -47,13 +50,14 @@ public final Map metadata() return metadata; } - private static Map buildMetadata( Bookmarks bookmarks, Duration txTimeout, Map txMetadata ) + private static Map buildMetadata( Bookmarks bookmarks, Duration txTimeout, Map txMetadata, AccessMode mode ) { boolean bookmarksPresent = bookmarks != null && !bookmarks.isEmpty(); boolean txTimeoutPresent = txTimeout != null; boolean txMetadataPresent = txMetadata != null && !txMetadata.isEmpty(); + boolean accessModePresent = mode == AccessMode.READ; - if ( !bookmarksPresent && !txTimeoutPresent && !txMetadataPresent ) + if ( !bookmarksPresent && !txTimeoutPresent && !txMetadataPresent && !accessModePresent ) { return emptyMap(); } @@ -73,6 +77,13 @@ private static Map buildMetadata( Bookmarks bookmarks, Duration tx result.put( TX_METADATA_METADATA_KEY, value( txMetadata ) ); } + switch ( mode ) + { + case READ: + result.put( MODE_KEY, value( MODE_READ_VALUE ) ); + break; + } + return result; } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java index a710b60d99..697f0455b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java @@ -96,7 +96,7 @@ public void prepareToCloseChannel( Channel channel ) @Override public CompletionStage beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config ) { - BeginMessage beginMessage = new BeginMessage( bookmarks, config ); + BeginMessage beginMessage = new BeginMessage( bookmarks, config, connection.mode() ); if ( bookmarks.isEmpty() ) { @@ -148,7 +148,7 @@ private static CompletionStage runStatement( Conn Map params = statement.parameters().asMap( ofValue() ); CompletableFuture runCompletedFuture = new CompletableFuture<>(); - Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config ); + Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config, connection.mode() ); RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR ); PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, bookmarksHolder, tx ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index a7ece08a47..827912b4b6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -24,6 +24,7 @@ import org.neo4j.driver.internal.messaging.BoltProtocol; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.AccessMode; public interface Connection { @@ -52,4 +53,9 @@ public interface Connection ServerVersion serverVersion(); BoltProtocol protocol(); + + default AccessMode mode() + { + return AccessMode.WRITE; + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java index e35d4908a0..dbb51f4b43 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java @@ -19,14 +19,20 @@ package org.neo4j.driver.internal; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; +import org.neo4j.driver.internal.async.AccessModeConnection; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.v1.AccessMode; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.mockito.Mockito.mock; @@ -48,8 +54,26 @@ void acquiresConnectionsFromThePool() ConnectionPool pool = poolMock( address, connection1, connection2 ); DirectConnectionProvider provider = new DirectConnectionProvider( address, pool ); - assertSame( connection1, await( provider.acquireConnection( READ ) ) ); - assertSame( connection2, await( provider.acquireConnection( WRITE ) ) ); + Connection acquired1 = await( provider.acquireConnection( READ ) ); + assertThat( acquired1, instanceOf( AccessModeConnection.class ) ); + assertSame( connection1, ((AccessModeConnection) acquired1).connection() ); + + Connection acquired2 = await( provider.acquireConnection( WRITE ) ); + assertThat( acquired2, instanceOf( AccessModeConnection.class ) ); + assertSame( connection2, ((AccessModeConnection) acquired2).connection() ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void returnsCorrectAccessMode( AccessMode mode ) + { + BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT; + ConnectionPool pool = poolMock( address, mock( Connection.class ) ); + DirectConnectionProvider provider = new DirectConnectionProvider( address, pool ); + + Connection acquired = await( provider.acquireConnection( mode ) ); + + assertEquals( mode, acquired.mode() ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java index 5c9402e2a4..eff3a74234 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java @@ -30,6 +30,7 @@ import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; @@ -143,6 +144,48 @@ void shouldLogConnectionIdInDebugMode() throws Exception } } + @Test + void shouldSendReadAccessModeInStatementMetadata() throws Exception + { + StubServer server = StubServer.start( "hello_run_exit_read.script", 9001 ); + + Config config = Config.builder() + .withoutEncryption() + .build(); + + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config ); + Session session = driver.session( AccessMode.READ ) ) + { + List names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() ); + assertEquals( asList( "Foo", "Bar" ), names ); + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + + @Test + void shouldNotSendWriteAccessModeInStatementMetadata() throws Exception + { + StubServer server = StubServer.start( "hello_run_exit.script", 9001 ); + + Config config = Config.builder() + .withoutEncryption() + .build(); + + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config ); + Session session = driver.session( AccessMode.WRITE ) ) + { + List names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() ); + assertEquals( asList( "Foo", "Bar" ), names ); + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + @Test void shouldCloseChannelWhenResetFails() throws Exception { diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index 4d1dbf7363..0a6a60cbea 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -77,7 +77,7 @@ class RoutingDriverBoltKitTest void shouldHandleAcquireReadSession() throws IOException, InterruptedException, StubServer.ForceKilled { // Given - StubServer server = StubServer.start( "acquire_endpoints.script", 9001 ); + StubServer server = StubServer.start( "acquire_endpoints_v3.script", 9001 ); //START a read server StubServer readServer = StubServer.start( "read_server.script", 9005 ); @@ -95,6 +95,45 @@ void shouldHandleAcquireReadSession() throws IOException, InterruptedException, assertThat( readServer.exitStatus(), equalTo( 0 ) ); } + @Test + void shouldSendReadAccessModeOnStatementMetadata() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "acquire_endpoints_v3.script", 9001 ); + + //START a read server + StubServer readServer = StubServer.start( "read_server_v3_read.script", 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( Driver driver = GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.READ ) ) + { + session.run( "MATCH (n) RETURN n.name" ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + } + + @Test + void shouldSendReadAccessModeOnStatementMetadataOnReadTx() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "acquire_endpoints_v3.script", 9001 ); + + //START a read server + StubServer readServer = StubServer.start( "read_server_v3_read_tx.script", 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( Driver driver = GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.READ ) ) + { + session.readTransaction( t -> t.run( "MATCH (n) RETURN n.name" ) ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + } + + @Test void shouldHandleAcquireReadSessionPlusTransaction() throws IOException, InterruptedException, StubServer.ForceKilled @@ -293,6 +332,44 @@ void shouldHandleAcquireWriteSession() throws IOException, InterruptedException, assertThat( writeServer.exitStatus(), equalTo( 0 ) ); } + @Test + void shouldNotSendWriteAccessModeOnStatementMetadata() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "acquire_endpoints_v3.script", 9001 ); + + //START a write server + StubServer writeServer = StubServer.start( "write_server_v3_write.script", 9007 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( Driver driver = GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.WRITE ) ) + { + session.run( "CREATE (n {name:'Bob'})" ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + @Test + void shouldNotSendWriteAccessModeOnStatementMetadataWithWriteTx() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "acquire_endpoints_v3.script", 9001 ); + + //START a write server + StubServer writeServer = StubServer.start( "write_server_v3_write_tx.script", 9007 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( Driver driver = GraphDatabase.driver( uri, config ); + Session session = driver.session() ) + { + session.writeTransaction( t -> t.run( "CREATE (n {name:'Bob'})" ) ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + @Test void shouldHandleAcquireWriteSessionAndTransaction() throws IOException, InterruptedException, StubServer.ForceKilled diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/AccessModeConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/AccessModeConnectionTest.java new file mode 100644 index 0000000000..2594fcc4d5 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/AccessModeConnectionTest.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.messaging.BoltProtocol; +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.net.ServerAddress; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.AccessMode.READ; + +class AccessModeConnectionTest +{ + + @ParameterizedTest + @ValueSource( strings = {"true", "false"} ) + void shouldDelegateIsOpen( String open ) + { + Connection mockConnection = mock( Connection.class ); + when( mockConnection.isOpen() ).thenReturn( Boolean.valueOf( open ) ); + + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + assertEquals( Boolean.valueOf( open ).booleanValue(), connection.isOpen() ); + verify( mockConnection ).isOpen(); + } + + @Test + void shouldDelegateEnableAutoRead() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + connection.enableAutoRead(); + + verify( mockConnection ).enableAutoRead(); + } + + @Test + void shouldDelegateDisableAutoRead() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + connection.disableAutoRead(); + + verify( mockConnection ).disableAutoRead(); + } + + @Test + void shouldDelegateWrite() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + Message message = mock( Message.class ); + ResponseHandler handler = mock( ResponseHandler.class ); + + connection.write( message, handler ); + + verify( mockConnection ).write( message, handler ); + } + + @Test + void shouldDelegateWriteTwoMessages() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + Message message1 = mock( Message.class ); + ResponseHandler handler1 = mock( ResponseHandler.class ); + Message message2 = mock( Message.class ); + ResponseHandler handler2 = mock( ResponseHandler.class ); + + connection.write( message1, handler1, message2, handler2 ); + + verify( mockConnection ).write( message1, handler1, message2, handler2 ); + } + + @Test + void shouldDelegateWriteAndFlush() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + Message message = mock( Message.class ); + ResponseHandler handler = mock( ResponseHandler.class ); + + connection.writeAndFlush( message, handler ); + + verify( mockConnection ).writeAndFlush( message, handler ); + } + + @Test + void shouldDelegateWriteAndFlush1() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + Message message1 = mock( Message.class ); + ResponseHandler handler1 = mock( ResponseHandler.class ); + Message message2 = mock( Message.class ); + ResponseHandler handler2 = mock( ResponseHandler.class ); + + connection.writeAndFlush( message1, handler1, message2, handler2 ); + + verify( mockConnection ).writeAndFlush( message1, handler1, message2, handler2 ); + } + + @Test + void shouldDelegateReset() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + connection.reset(); + + verify( mockConnection ).reset(); + } + + @Test + void shouldDelegateRelease() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + connection.release(); + + verify( mockConnection ).release(); + } + + @Test + void shouldDelegateTerminateAndRelease() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + connection.terminateAndRelease( "a reason" ); + + verify( mockConnection ).terminateAndRelease( "a reason" ); + } + + @Test + void shouldDelegateServerAddress() + { + BoltServerAddress address = BoltServerAddress.from( ServerAddress.of( "localhost", 9999 ) ); + Connection mockConnection = mock( Connection.class ); + when( mockConnection.serverAddress() ).thenReturn( address ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + assertSame( address, connection.serverAddress() ); + verify( mockConnection ).serverAddress(); + } + + @Test + void shouldDelegateServerVersion() + { + ServerVersion version = ServerVersion.version( "Neo4j/3.5.3" ); + Connection mockConnection = mock( Connection.class ); + when( mockConnection.serverVersion() ).thenReturn( version ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + assertSame( version, connection.serverVersion() ); + verify( mockConnection ).serverVersion(); + } + + @Test + void shouldDelegateProtocol() + { + BoltProtocol protocol = mock( BoltProtocol.class ); + Connection mockConnection = mock( Connection.class ); + when( mockConnection.protocol() ).thenReturn( protocol ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + assertSame( protocol, connection.protocol() ); + verify( mockConnection ).protocol(); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldReturnModeFromConstructor( AccessMode mode ) + { + AccessModeConnection connection = new AccessModeConnection( mock( Connection.class ), mode ); + + assertEquals( mode, connection.mode() ); + } + + @Test + void shouldReturnConnection() + { + Connection mockConnection = mock( Connection.class ); + AccessModeConnection connection = new AccessModeConnection( mockConnection, READ ); + + assertSame( mockConnection, connection.connection() ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index 65bd2f67e1..e85944c02d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -20,6 +20,8 @@ import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.HashSet; @@ -27,6 +29,7 @@ import java.util.Set; import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.async.AccessModeConnection; import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterComposition; import org.neo4j.driver.internal.cluster.ClusterRoutingTable; @@ -44,6 +47,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -71,6 +76,29 @@ class LoadBalancerTest { + @ParameterizedTest + @EnumSource( AccessMode.class ) + void returnsCorrectAccessMode( AccessMode mode ) + { + ConnectionPool connectionPool = newConnectionPoolMock(); + RoutingTable routingTable = mock( RoutingTable.class ); + AddressSet readerAddresses = mock( AddressSet.class ); + AddressSet writerAddresses = mock( AddressSet.class ); + when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); + when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{B} ); + when( routingTable.readers() ).thenReturn( readerAddresses ); + when( routingTable.writers() ).thenReturn( writerAddresses ); + Rediscovery rediscovery = mock( Rediscovery.class ); + + LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, + GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGING ); + + Connection acquired = await( loadBalancer.acquireConnection( mode ) ); + + assertThat( acquired, instanceOf( AccessModeConnection.class ) ); + assertThat( acquired.mode(), equalTo( mode ) ); + } + @Test void acquireShouldUpdateRoutingTableWhenKnownRoutingTableIsStale() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/BeginMessageEncoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/BeginMessageEncoderTest.java index c0d71c0a8b..d589f0f431 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/BeginMessageEncoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/BeginMessageEncoderTest.java @@ -19,6 +19,8 @@ package org.neo4j.driver.internal.messaging.encode; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; import java.time.Duration; @@ -28,12 +30,14 @@ import org.neo4j.driver.internal.Bookmarks; import org.neo4j.driver.internal.messaging.ValuePacker; import org.neo4j.driver.internal.messaging.request.BeginMessage; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Value; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET; +import static org.neo4j.driver.v1.AccessMode.*; import static org.neo4j.driver.v1.Values.value; class BeginMessageEncoderTest @@ -41,8 +45,9 @@ class BeginMessageEncoderTest private final BeginMessageEncoder encoder = new BeginMessageEncoder(); private final ValuePacker packer = mock( ValuePacker.class ); - @Test - void shouldEncodeBeginMessage() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldEncodeBeginMessage( AccessMode mode ) throws Exception { Bookmarks bookmarks = Bookmarks.from( "neo4j:bookmark:v1:tx42" ); @@ -52,7 +57,7 @@ void shouldEncodeBeginMessage() throws Exception Duration txTimeout = Duration.ofSeconds( 1 ); - encoder.encode( new BeginMessage( bookmarks, txTimeout, txMetadata ), packer ); + encoder.encode( new BeginMessage( bookmarks, txTimeout, txMetadata, mode ), packer ); InOrder order = inOrder( packer ); order.verify( packer ).packStructHeader( 1, BeginMessage.SIGNATURE ); @@ -61,6 +66,10 @@ void shouldEncodeBeginMessage() throws Exception expectedMetadata.put( "bookmarks", value( bookmarks.values() ) ); expectedMetadata.put( "tx_timeout", value( 1000 ) ); expectedMetadata.put( "tx_metadata", value( txMetadata ) ); + if ( mode == READ ) + { + expectedMetadata.put( "mode", value( "r" ) ); + } order.verify( packer ).pack( expectedMetadata ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RunWithMetadataMessageEncoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RunWithMetadataMessageEncoderTest.java index 50945ec60c..65e111913a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RunWithMetadataMessageEncoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RunWithMetadataMessageEncoderTest.java @@ -19,6 +19,8 @@ package org.neo4j.driver.internal.messaging.encode; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; import java.time.Duration; @@ -28,6 +30,7 @@ import org.neo4j.driver.internal.Bookmarks; import org.neo4j.driver.internal.messaging.ValuePacker; import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Value; import static java.util.Collections.singletonMap; @@ -35,6 +38,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.messaging.request.DiscardAllMessage.DISCARD_ALL; +import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.Values.value; class RunWithMetadataMessageEncoderTest @@ -42,8 +46,9 @@ class RunWithMetadataMessageEncoderTest private final RunWithMetadataMessageEncoder encoder = new RunWithMetadataMessageEncoder(); private final ValuePacker packer = mock( ValuePacker.class ); - @Test - void shouldEncodeRunWithMetadataMessage() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldEncodeRunWithMetadataMessage( AccessMode mode ) throws Exception { Map params = singletonMap( "answer", value( 42 ) ); @@ -56,7 +61,7 @@ void shouldEncodeRunWithMetadataMessage() throws Exception Duration txTimeout = Duration.ofMillis( 42 ); - encoder.encode( new RunWithMetadataMessage( "RETURN $answer", params, bookmarks, txTimeout, txMetadata ), packer ); + encoder.encode( new RunWithMetadataMessage( "RETURN $answer", params, bookmarks, txTimeout, txMetadata, mode ), packer ); InOrder order = inOrder( packer ); order.verify( packer ).packStructHeader( 3, RunWithMetadataMessage.SIGNATURE ); @@ -67,6 +72,10 @@ void shouldEncodeRunWithMetadataMessage() throws Exception expectedMetadata.put( "bookmarks", value( bookmarks.values() ) ); expectedMetadata.put( "tx_timeout", value( 42 ) ); expectedMetadata.put( "tx_metadata", value( txMetadata ) ); + if ( mode == READ ) + { + expectedMetadata.put( "mode", value( "r" ) ); + } order.verify( packer ).pack( expectedMetadata ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/request/BeginMessageTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/request/BeginMessageTest.java index 9ff16961cd..81a44fd782 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/request/BeginMessageTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/request/BeginMessageTest.java @@ -18,23 +18,27 @@ */ package org.neo4j.driver.internal.messaging.request; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.time.Duration; import java.util.HashMap; import java.util.Map; import org.neo4j.driver.internal.Bookmarks; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Value; import static java.util.Arrays.asList; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.Values.value; class BeginMessageTest { - @Test - void shouldHaveCorrectMetadata() + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldHaveCorrectMetadata( AccessMode mode ) { Bookmarks bookmarks = Bookmarks.from( asList( "neo4j:bookmark:v1:tx42", "neo4j:bookmark:v1:tx4242", "neo4j:bookmark:v1:tx424242" ) ); @@ -45,12 +49,16 @@ void shouldHaveCorrectMetadata() Duration txTimeout = Duration.ofSeconds( 13 ); - BeginMessage message = new BeginMessage( bookmarks, txTimeout, txMetadata ); + BeginMessage message = new BeginMessage( bookmarks, txTimeout, txMetadata, mode ); Map expectedMetadata = new HashMap<>(); expectedMetadata.put( "bookmarks", value( bookmarks.values() ) ); expectedMetadata.put( "tx_timeout", value( 13_000 ) ); expectedMetadata.put( "tx_metadata", value( txMetadata ) ); + if ( mode == READ ) + { + expectedMetadata.put( "mode", value( "r" ) ); + } assertEquals( expectedMetadata, message.metadata() ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessageTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessageTest.java index c93c448d41..ad196a548a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessageTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessageTest.java @@ -18,7 +18,8 @@ */ package org.neo4j.driver.internal.messaging.request; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.time.Duration; import java.time.LocalDateTime; @@ -26,17 +27,20 @@ import java.util.Map; import org.neo4j.driver.internal.Bookmarks; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Value; import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.Values.value; class RunWithMetadataMessageTest { - @Test - void shouldHaveCorrectMetadata() + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldHaveCorrectMetadata( AccessMode mode ) { Bookmarks bookmarks = Bookmarks.from( asList( "neo4j:bookmark:v1:tx11", "neo4j:bookmark:v1:tx52" ) ); @@ -47,12 +51,16 @@ void shouldHaveCorrectMetadata() Duration txTimeout = Duration.ofSeconds( 7 ); - RunWithMetadataMessage message = new RunWithMetadataMessage( "RETURN 1", emptyMap(), bookmarks, txTimeout, txMetadata ); + RunWithMetadataMessage message = new RunWithMetadataMessage( "RETURN 1", emptyMap(), bookmarks, txTimeout, txMetadata, mode ); Map expectedMetadata = new HashMap<>(); expectedMetadata.put( "bookmarks", value( bookmarks.values() ) ); expectedMetadata.put( "tx_timeout", value( 7000 ) ); expectedMetadata.put( "tx_metadata", value( txMetadata ) ); + if ( mode == READ ) + { + expectedMetadata.put( "mode", value( "r" ) ); + } assertEquals( expectedMetadata, message.metadata() ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java index c069ef94da..3123f11749 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; import java.util.HashMap; @@ -54,6 +56,7 @@ import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.TransactionConfig; @@ -77,6 +80,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.util.ServerVersion.v3_5_0; +import static org.neo4j.driver.v1.AccessMode.WRITE; import static org.neo4j.driver.v1.Values.value; import static org.neo4j.driver.v1.util.TestUtil.DEFAULT_TEST_PROTOCOL; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -172,7 +176,7 @@ void shouldBeginTransactionWithoutBookmark() CompletionStage stage = protocol.beginTransaction( connection, Bookmarks.empty(), TransactionConfig.empty() ); - verify( connection ).write( new BeginMessage( Bookmarks.empty(), TransactionConfig.empty() ), NoOpResponseHandler.INSTANCE ); + verify( connection ).write( new BeginMessage( Bookmarks.empty(), TransactionConfig.empty(), WRITE ), NoOpResponseHandler.INSTANCE ); assertNull( await( stage ) ); } @@ -184,7 +188,7 @@ void shouldBeginTransactionWithBookmarks() CompletionStage stage = protocol.beginTransaction( connection, bookmarks, TransactionConfig.empty() ); - verify( connection ).writeAndFlush( eq( new BeginMessage( bookmarks, TransactionConfig.empty() ) ), any( BeginTxResponseHandler.class ) ); + verify( connection ).writeAndFlush( eq( new BeginMessage( bookmarks, TransactionConfig.empty(), WRITE ) ), any( BeginTxResponseHandler.class ) ); assertNull( await( stage ) ); } @@ -195,7 +199,7 @@ void shouldBeginTransactionWithConfig() CompletionStage stage = protocol.beginTransaction( connection, Bookmarks.empty(), txConfig ); - verify( connection ).write( new BeginMessage( Bookmarks.empty(), txConfig ), NoOpResponseHandler.INSTANCE ); + verify( connection ).write( new BeginMessage( Bookmarks.empty(), txConfig, WRITE ), NoOpResponseHandler.INSTANCE ); assertNull( await( stage ) ); } @@ -207,7 +211,7 @@ void shouldBeginTransactionWithBookmarksAndConfig() CompletionStage stage = protocol.beginTransaction( connection, bookmarks, txConfig ); - verify( connection ).writeAndFlush( eq( new BeginMessage( bookmarks, txConfig ) ), any( BeginTxResponseHandler.class ) ); + verify( connection ).writeAndFlush( eq( new BeginMessage( bookmarks, txConfig, WRITE ) ), any( BeginTxResponseHandler.class ) ); assertNull( await( stage ) ); } @@ -242,58 +246,66 @@ void shouldRollbackTransaction() assertNull( await( stage ) ); } - @Test - void shouldRunInAutoCommitTransactionWithoutWaitingForRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception { - testRunWithoutWaitingForRunResponse( true, TransactionConfig.empty() ); + testRunWithoutWaitingForRunResponse( true, TransactionConfig.empty(), mode ); } - @Test - void shouldRunInAutoCommitWithConfigTransactionWithoutWaitingForRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitWithConfigTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception { - testRunWithoutWaitingForRunResponse( true, txConfig ); + testRunWithoutWaitingForRunResponse( true, txConfig, mode ); } - @Test - void shouldRunInAutoCommitTransactionAndWaitForSuccessRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception { - testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks.empty(), TransactionConfig.empty() ); + testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks.empty(), TransactionConfig.empty(), mode ); } - @Test - void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForSuccessRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception { - testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks.from( "neo4j:bookmark:v1:tx65" ), txConfig ); + testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks.from( "neo4j:bookmark:v1:tx65" ), txConfig, mode ); } - @Test - void shouldRunInAutoCommitTransactionAndWaitForFailureRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionAndWaitForFailureRunResponse( AccessMode mode ) throws Exception { - testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks.empty(), TransactionConfig.empty() ); + testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks.empty(), TransactionConfig.empty(), mode ); } - @Test - void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForFailureRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForFailureRunResponse( AccessMode mode ) throws Exception { - testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks.from( "neo4j:bookmark:v1:tx163" ), txConfig ); + testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks.from( "neo4j:bookmark:v1:tx163" ), txConfig, mode ); } - @Test - void shouldRunInExplicitTransactionWithoutWaitingForRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInExplicitTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception { - testRunWithoutWaitingForRunResponse( false, TransactionConfig.empty() ); + testRunWithoutWaitingForRunResponse( false, TransactionConfig.empty(), mode ); } - @Test - void shouldRunInExplicitTransactionAndWaitForSuccessRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInExplicitTransactionAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception { - Connection connection = connectionMock(); + Connection connection = connectionMock( mode ); CompletableFuture cursorFuture = protocol.runInExplicitTransaction( connection, STATEMENT, mock( ExplicitTransaction.class ), true ).toCompletableFuture(); assertFalse( cursorFuture.isDone() ); - ResponseHandler runResponseHandler = verifyRunInvoked( connection, false, Bookmarks.empty(), TransactionConfig.empty() ).runHandler; + ResponseHandler runResponseHandler = verifyRunInvoked( connection, false, Bookmarks.empty(), TransactionConfig.empty(), mode ).runHandler; runResponseHandler.onSuccess( emptyMap() ); @@ -301,25 +313,26 @@ void shouldRunInExplicitTransactionAndWaitForSuccessRunResponse() throws Excepti assertNotNull( cursorFuture.get() ); } - @Test - void shouldRunInExplicitTransactionAndWaitForFailureRunResponse() throws Exception + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInExplicitTransactionAndWaitForFailureRunResponse( AccessMode mode ) throws Exception { - Connection connection = connectionMock(); + Connection connection = connectionMock( mode ); CompletableFuture cursorFuture = protocol.runInExplicitTransaction( connection, STATEMENT, mock( ExplicitTransaction.class ), true ).toCompletableFuture(); assertFalse( cursorFuture.isDone() ); - ResponseHandler runResponseHandler = verifyRunInvoked( connection, false, Bookmarks.empty(), TransactionConfig.empty() ).runHandler; + ResponseHandler runResponseHandler = verifyRunInvoked( connection, false, Bookmarks.empty(), TransactionConfig.empty(), mode ).runHandler; runResponseHandler.onFailure( new RuntimeException() ); assertTrue( cursorFuture.isDone() ); assertNotNull( cursorFuture.get() ); } - private void testRunWithoutWaitingForRunResponse( boolean autoCommitTx, TransactionConfig config ) throws Exception + private void testRunWithoutWaitingForRunResponse( boolean autoCommitTx, TransactionConfig config, AccessMode mode ) throws Exception { - Connection connection = connectionMock(); + Connection connection = connectionMock( mode ); Bookmarks initialBookmarks = Bookmarks.from( "neo4j:bookmark:v1:tx987" ); CompletionStage cursorStage; @@ -339,24 +352,24 @@ private void testRunWithoutWaitingForRunResponse( boolean autoCommitTx, Transact if ( autoCommitTx ) { - verifyRunInvoked( connection, autoCommitTx, initialBookmarks, config ); + verifyRunInvoked( connection, autoCommitTx, initialBookmarks, config, mode ); } else { - verifyRunInvoked( connection, autoCommitTx, Bookmarks.empty(), config ); + verifyRunInvoked( connection, autoCommitTx, Bookmarks.empty(), config, mode ); } } - private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks bookmarks, TransactionConfig config ) throws Exception + private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks bookmarks, TransactionConfig config, AccessMode mode ) throws Exception { - Connection connection = connectionMock(); + Connection connection = connectionMock( mode ); BookmarksHolder bookmarksHolder = new SimpleBookmarksHolder( bookmarks ); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( connection, STATEMENT, bookmarksHolder, config, true ).toCompletableFuture(); assertFalse( cursorFuture.isDone() ); - ResponseHandlers handlers = verifyRunInvoked( connection, true, bookmarks, config ); + ResponseHandlers handlers = verifyRunInvoked( connection, true, bookmarks, config, mode ); String newBookmarkValue = "neo4j:bookmark:v1:tx98765"; handlers.runHandler.onSuccess( emptyMap() ); @@ -367,16 +380,16 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmarks bo assertNotNull( cursorFuture.get() ); } - private void testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks bookmarks, TransactionConfig config ) throws Exception + private void testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks bookmarks, TransactionConfig config, AccessMode mode ) throws Exception { - Connection connection = connectionMock(); + Connection connection = connectionMock( mode ); BookmarksHolder bookmarksHolder = new SimpleBookmarksHolder( bookmarks ); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( connection, STATEMENT, bookmarksHolder, config, true ).toCompletableFuture(); assertFalse( cursorFuture.isDone() ); - ResponseHandler runResponseHandler = verifyRunInvoked( connection, true, bookmarks, config ).runHandler; + ResponseHandler runResponseHandler = verifyRunInvoked( connection, true, bookmarks, config, mode ).runHandler; runResponseHandler.onFailure( new RuntimeException() ); assertEquals( bookmarks, bookmarksHolder.getBookmarks() ); @@ -384,12 +397,12 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmarks bookma assertNotNull( cursorFuture.get() ); } - private static ResponseHandlers verifyRunInvoked( Connection connection, boolean session, Bookmarks bookmarks, TransactionConfig config ) + private static ResponseHandlers verifyRunInvoked( Connection connection, boolean session, Bookmarks bookmarks, TransactionConfig config, AccessMode mode ) { ArgumentCaptor runHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class ); ArgumentCaptor pullAllHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class ); - RunWithMetadataMessage expectedMessage = new RunWithMetadataMessage( QUERY, PARAMS, bookmarks, config ); + RunWithMetadataMessage expectedMessage = new RunWithMetadataMessage( QUERY, PARAMS, bookmarks, config, mode ); verify( connection ).writeAndFlush( eq( expectedMessage ), runHandlerCaptor.capture(), eq( PullAllMessage.PULL_ALL ), pullAllHandlerCaptor.capture() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/MessageWriterV3Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/MessageWriterV3Test.java index 0f478f6304..547e9b0069 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/MessageWriterV3Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/MessageWriterV3Test.java @@ -33,6 +33,7 @@ import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage; import org.neo4j.driver.internal.packstream.PackOutput; import org.neo4j.driver.internal.security.InternalAuthToken; +import org.neo4j.driver.v1.AccessMode; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -42,6 +43,7 @@ import static org.neo4j.driver.internal.messaging.request.PullAllMessage.PULL_ALL; import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET; import static org.neo4j.driver.internal.messaging.request.RollbackMessage.ROLLBACK; +import static org.neo4j.driver.v1.AccessMode.*; import static org.neo4j.driver.v1.AuthTokens.basic; import static org.neo4j.driver.v1.Values.point; import static org.neo4j.driver.v1.Values.value; @@ -61,20 +63,27 @@ protected Stream supportedMessages() // Bolt V3 messages new HelloMessage( "MyDriver/1.2.3", ((InternalAuthToken) basic( "neo4j", "neo4j" )).toMap() ), GOODBYE, - new BeginMessage( Bookmarks.from( "neo4j:bookmark:v1:tx123" ), Duration.ofSeconds( 5 ), singletonMap( "key", value( 42 ) ) ), + new BeginMessage( Bookmarks.from( "neo4j:bookmark:v1:tx123" ), Duration.ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), READ ), + new BeginMessage( Bookmarks.from( "neo4j:bookmark:v1:tx123" ), Duration.ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), WRITE ), COMMIT, ROLLBACK, new RunWithMetadataMessage( "RETURN 1", emptyMap(), Bookmarks.from( "neo4j:bookmark:v1:tx1" ), Duration.ofSeconds( 5 ), - singletonMap( "key", value( 42 ) ) ), + singletonMap( "key", value( 42 ) ), READ ), + new RunWithMetadataMessage( "RETURN 1", emptyMap(), Bookmarks.from( "neo4j:bookmark:v1:tx1" ), Duration.ofSeconds( 5 ), + singletonMap( "key", value( 42 ) ), WRITE ), PULL_ALL, DISCARD_ALL, RESET, // Bolt V3 messages with struct values new RunWithMetadataMessage( "RETURN $x", singletonMap( "x", value( ZonedDateTime.now() ) ), Bookmarks.empty(), - Duration.ofSeconds( 1 ), emptyMap() ), + Duration.ofSeconds( 1 ), emptyMap(), READ ), + new RunWithMetadataMessage( "RETURN $x", singletonMap( "x", value( ZonedDateTime.now() ) ), Bookmarks.empty(), + Duration.ofSeconds( 1 ), emptyMap(), WRITE ), + new RunWithMetadataMessage( "RETURN $x", singletonMap( "x", point( 42, 1, 2, 3 ) ), Bookmarks.empty(), + Duration.ofSeconds( 1 ), emptyMap(), READ ), new RunWithMetadataMessage( "RETURN $x", singletonMap( "x", point( 42, 1, 2, 3 ) ), Bookmarks.empty(), - Duration.ofSeconds( 1 ), emptyMap() ) + Duration.ofSeconds( 1 ), emptyMap(), WRITE ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index a12b2d5a28..60a1849be8 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -22,8 +22,6 @@ import io.netty.util.internal.PlatformDependent; import org.mockito.ArgumentMatcher; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; @@ -49,6 +47,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -68,6 +67,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.util.Neo4jFeature.LIST_QUERIES_PROCEDURE; import static org.neo4j.driver.internal.util.ServerVersion.version; +import static org.neo4j.driver.v1.AccessMode.*; public final class TestUtil { @@ -193,11 +193,17 @@ public static String cleanDb( Driver driver ) } public static Connection connectionMock() + { + return connectionMock( WRITE ); + } + + public static Connection connectionMock( AccessMode mode ) { Connection connection = mock( Connection.class ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.vInDev ); when( connection.protocol() ).thenReturn( DEFAULT_TEST_PROTOCOL ); + when( connection.mode() ).thenReturn( mode ); setupSuccessfulPullAll( connection, "COMMIT" ); setupSuccessfulPullAll( connection, "ROLLBACK" ); setupSuccessfulPullAll( connection, "BEGIN" ); diff --git a/driver/src/test/resources/acquire_endpoints_v3.script b/driver/src/test/resources/acquire_endpoints_v3.script new file mode 100644 index 0000000000..347eb16c6e --- /dev/null +++ b/driver/src/test/resources/acquire_endpoints_v3.script @@ -0,0 +1,10 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/driver/src/test/resources/hello_run_exit_read.script b/driver/src/test/resources/hello_run_exit_read.script new file mode 100644 index 0000000000..ad2a0d01a4 --- /dev/null +++ b/driver/src/test/resources/hello_run_exit_read.script @@ -0,0 +1,12 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Foo"] + RECORD ["Bar"] + SUCCESS {} +S: diff --git a/driver/src/test/resources/read_server_v3_read.script b/driver/src/test/resources/read_server_v3_read.script new file mode 100644 index 0000000000..a0c3a89651 --- /dev/null +++ b/driver/src/test/resources/read_server_v3_read.script @@ -0,0 +1,12 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: RUN "MATCH (n) RETURN n.name" {} { "mode": "r" } + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + RECORD ["Tina"] + SUCCESS {} diff --git a/driver/src/test/resources/read_server_v3_read_tx.script b/driver/src/test/resources/read_server_v3_read_tx.script new file mode 100644 index 0000000000..5c5836cc3e --- /dev/null +++ b/driver/src/test/resources/read_server_v3_read_tx.script @@ -0,0 +1,16 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: BEGIN { "mode": "r" } +S: SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} { "mode": "r" } + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + RECORD ["Tina"] + SUCCESS {} +C: COMMIT +S: SUCCESS { "bookmark": "ABookmark" } diff --git a/driver/src/test/resources/write_server_v3_write.script b/driver/src/test/resources/write_server_v3_write.script new file mode 100644 index 0000000000..26957408b1 --- /dev/null +++ b/driver/src/test/resources/write_server_v3_write.script @@ -0,0 +1,9 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: RUN "CREATE (n {name:'Bob'})" {} {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} diff --git a/driver/src/test/resources/write_server_v3_write_tx.script b/driver/src/test/resources/write_server_v3_write_tx.script new file mode 100644 index 0000000000..14f74a5c40 --- /dev/null +++ b/driver/src/test/resources/write_server_v3_write_tx.script @@ -0,0 +1,13 @@ +!: BOLT 3 +!: AUTO RESET + +C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"} +S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} +C: BEGIN {} +S: SUCCESS {} +C: RUN "CREATE (n {name:'Bob'})" {} {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: COMMIT +S: SUCCESS { "bookmark": "ABookmark" }