Skip to content

Commit db40524

Browse files
author
Zhen Li
committed
Should not retry transaction on connection errors when committing a write transaction.
When committing a transaction, if we get a reply from the server, we know the status of the transaction (success or fail). However if we fail to get a repy from the server due to database crash or network crash, then we are uncertain about the status of this transaction. Because it is possible that this tx has been committed by server, but it just has failed to send us the reply via wire. It is okay if this transaction is a read only transaction, however if this is a write transaction, if we retry automatically, we might commit the same data twice. This PR marks the connection error happened at committing for write tx differently (ServiceUnavailable -> ConnectionBrokenAtCommit) from normal retriable connection errors (ServiceUnavailable -> SessionExpired). As a result, on receiving of this error, the tx will not be retried.
1 parent 71ff948 commit db40524

File tree

11 files changed

+208
-5
lines changed

11 files changed

+208
-5
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import org.neo4j.driver.internal.spi.ResponseHandler;
22+
23+
interface AbstractCommitTxResponseHandler extends ResponseHandler
24+
{
25+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import java.util.Map;
22+
23+
import org.neo4j.driver.v1.Value;
24+
25+
public class CommitTxNoOpResponseHandler implements AbstractCommitTxResponseHandler
26+
{
27+
public static final CommitTxNoOpResponseHandler INSTANCE = new CommitTxNoOpResponseHandler();
28+
29+
@Override
30+
public void onSuccess( Map<String,Value> metadata )
31+
{
32+
}
33+
34+
@Override
35+
public void onFailure( Throwable error )
36+
{
37+
}
38+
39+
@Override
40+
public void onRecord( Value[] fields )
41+
{
42+
}
43+
}

driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@
2323
import java.util.concurrent.CompletableFuture;
2424

2525
import org.neo4j.driver.internal.Bookmarks;
26-
import org.neo4j.driver.internal.spi.ResponseHandler;
2726
import org.neo4j.driver.v1.Value;
2827

2928
import static java.util.Objects.requireNonNull;
3029

31-
public class CommitTxResponseHandler implements ResponseHandler
30+
public class CommitTxResponseHandler implements AbstractCommitTxResponseHandler
3231
{
3332
private final CompletableFuture<Bookmarks> commitFuture;
3433

driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.v1.AccessMode;
2929
import org.neo4j.driver.v1.Value;
3030
import org.neo4j.driver.v1.exceptions.ClientException;
31+
import org.neo4j.driver.v1.exceptions.ConnectionBrokenAtCommitException;
3132
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3233
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3334
import org.neo4j.driver.v1.exceptions.TransientException;
@@ -106,6 +107,14 @@ else if ( error instanceof TransientException )
106107
private Throwable handledServiceUnavailableException( ServiceUnavailableException e )
107108
{
108109
errorHandler.onConnectionFailure( address );
110+
if ( accessMode == AccessMode.WRITE && delegate instanceof AbstractCommitTxResponseHandler )
111+
{
112+
// cannot be retried
113+
return new ConnectionBrokenAtCommitException( format( "Connection with server at %s is broken at commit. The commit status is unknown. " +
114+
"Before retrying your transaction, you might want to double check if the previous transaction has been successfully committed by database or not.",
115+
address ), e );
116+
}
117+
// can be retried
109118
return new SessionExpiredException( format( "Server at %s is no longer available", address ), e );
110119
}
111120

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.neo4j.driver.internal.ExplicitTransaction;
3131
import org.neo4j.driver.internal.InternalStatementResultCursor;
3232
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
33+
import org.neo4j.driver.internal.handlers.CommitTxNoOpResponseHandler;
3334
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
3435
import org.neo4j.driver.internal.handlers.InitResponseHandler;
3536
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
@@ -128,7 +129,7 @@ public CompletionStage<Bookmarks> commitTransaction( Connection connection )
128129

129130
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture );
130131
connection.writeAndFlush(
131-
COMMIT_MESSAGE, NoOpResponseHandler.INSTANCE,
132+
COMMIT_MESSAGE, CommitTxNoOpResponseHandler.INSTANCE,
132133
PullAllMessage.PULL_ALL, pullAllHandler );
133134

134135
return commitFuture;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.v1.exceptions;
20+
21+
public class ConnectionBrokenAtCommitException extends Neo4jException
22+
{
23+
public ConnectionBrokenAtCommitException( String message, Throwable e )
24+
{
25+
super( message, e );
26+
}
27+
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.neo4j.driver.v1.StatementResult;
4747
import org.neo4j.driver.v1.Transaction;
4848
import org.neo4j.driver.v1.TransactionWork;
49+
import org.neo4j.driver.v1.exceptions.ConnectionBrokenAtCommitException;
4950
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5051
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
5152
import org.neo4j.driver.v1.net.ServerAddress;
@@ -65,6 +66,7 @@
6566
import static org.mockito.ArgumentMatchers.any;
6667
import static org.mockito.ArgumentMatchers.startsWith;
6768
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.never;
6870
import static org.mockito.Mockito.times;
6971
import static org.mockito.Mockito.verify;
7072
import static org.mockito.Mockito.when;
@@ -764,6 +766,56 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc
764766
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
765767
}
766768

769+
@Test
770+
void shouldNotRetryWriteTransactionWhenFailedToCommitV3() throws Exception
771+
{
772+
StubServer router = StubServer.start( "acquire_endpoints_v3.script", 9001 );
773+
StubServer brokenWriter = StubServer.start( "dead_write_server_at_commit_v3.script", 9007 );
774+
775+
Logger logger = mock( Logger.class );
776+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
777+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
778+
Session session = driver.session() )
779+
{
780+
AtomicInteger invocations = new AtomicInteger();
781+
assertThrows( ConnectionBrokenAtCommitException.class,
782+
() -> session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ) );
783+
784+
assertEquals( 1, invocations.get() );
785+
}
786+
finally
787+
{
788+
assertEquals( 0, router.exitStatus() );
789+
assertEquals( 0, brokenWriter.exitStatus() );
790+
}
791+
verify( logger, never() ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
792+
}
793+
794+
@Test
795+
void shouldNotRetryWriteTransactionWhenFailedToCommit() throws Exception
796+
{
797+
StubServer router = StubServer.start( "acquire_endpoints_v3.script", 9001 );
798+
StubServer brokenWriter = StubServer.start( "dead_write_server_at_commit.script", 9007 );
799+
800+
Logger logger = mock( Logger.class );
801+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
802+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
803+
Session session = driver.session() )
804+
{
805+
AtomicInteger invocations = new AtomicInteger();
806+
assertThrows( ConnectionBrokenAtCommitException.class,
807+
() -> session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ) );
808+
809+
assertEquals( 1, invocations.get() );
810+
}
811+
finally
812+
{
813+
assertEquals( 0, router.exitStatus() );
814+
assertEquals( 0, brokenWriter.exitStatus() );
815+
}
816+
verify( logger, never() ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
817+
}
818+
767819
@Test
768820
void shouldRetryReadTransactionUntilFailure() throws Exception
769821
{

driver/src/test/java/org/neo4j/driver/internal/handlers/RoutingResponseHandlerTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.neo4j.driver.internal.spi.ResponseHandler;
2828
import org.neo4j.driver.v1.AccessMode;
2929
import org.neo4j.driver.v1.exceptions.ClientException;
30+
import org.neo4j.driver.v1.exceptions.ConnectionBrokenAtCommitException;
3031
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3132
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3233
import org.neo4j.driver.v1.exceptions.TransientException;
@@ -65,6 +66,18 @@ void shouldHandleServiceUnavailableException()
6566
verify( errorHandler ).onConnectionFailure( LOCAL_DEFAULT );
6667
}
6768

69+
@Test
70+
void shouldHandleServiceUnavailableExceptionSpeciallyForWriteOnCommitFailure()
71+
{
72+
ServiceUnavailableException error = new ServiceUnavailableException( "Hi" );
73+
RoutingErrorHandler errorHandler = mock( RoutingErrorHandler.class );
74+
75+
Throwable handledError = handle( mock( AbstractCommitTxResponseHandler.class ), error, errorHandler, AccessMode.WRITE );
76+
77+
verify( errorHandler ).onConnectionFailure( LOCAL_DEFAULT );
78+
assertThat( handledError, instanceOf( ConnectionBrokenAtCommitException.class ) );
79+
}
80+
6881
@Test
6982
void shouldHandleDatabaseUnavailableError()
7083
{
@@ -181,13 +194,19 @@ private static Throwable handle( Throwable error, RoutingErrorHandler errorHandl
181194
private static Throwable handle( Throwable error, RoutingErrorHandler errorHandler, AccessMode accessMode )
182195
{
183196
ResponseHandler responseHandler = mock( ResponseHandler.class );
197+
return handle( responseHandler, error, errorHandler, accessMode );
198+
}
199+
200+
private static Throwable handle( ResponseHandler responseHandler, Throwable error, RoutingErrorHandler errorHandler, AccessMode mode )
201+
{
184202
RoutingResponseHandler routingResponseHandler =
185-
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, accessMode, errorHandler );
203+
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, mode, errorHandler );
186204

187205
routingResponseHandler.onFailure( error );
188206

189207
ArgumentCaptor<Throwable> handledErrorCaptor = ArgumentCaptor.forClass( Throwable.class );
190208
verify( responseHandler ).onFailure( handledErrorCaptor.capture() );
191209
return handledErrorCaptor.getValue();
210+
192211
}
193212
}

driver/src/test/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1Test.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.neo4j.driver.internal.async.ChannelAttributes;
3939
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
4040
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
41+
import org.neo4j.driver.internal.handlers.CommitTxNoOpResponseHandler;
4142
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
4243
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
4344
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
@@ -192,7 +193,7 @@ void shouldCommitTransaction()
192193
CompletionStage<Bookmarks> stage = protocol.commitTransaction( connection );
193194

194195
verify( connection ).writeAndFlush(
195-
eq( new RunMessage( "COMMIT" ) ), eq( NoOpResponseHandler.INSTANCE ),
196+
eq( new RunMessage( "COMMIT" ) ), eq( CommitTxNoOpResponseHandler.INSTANCE ),
196197
eq( PullAllMessage.PULL_ALL ), any( CommitTxResponseHandler.class ) );
197198

198199
assertEquals( Bookmarks.from( bookmarkString ), await( stage ) );
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
!: AUTO RESET
2+
!: AUTO INIT
3+
4+
C: RUN "BEGIN" {}
5+
PULL_ALL
6+
RUN "CREATE (n {name:'Bob'})" {}
7+
PULL_ALL
8+
S: SUCCESS {}
9+
SUCCESS {}
10+
SUCCESS {"fields": []}
11+
SUCCESS {}
12+
C: RUN "COMMIT" {}
13+
S: <EXIT>

0 commit comments

Comments
 (0)