Skip to content

Commit 296ab3e

Browse files
jasontedorbleskes
authored andcommitted
Fix transport exceptions for shard state actions
This commit fixes an issue in the handling of TransportExceptions in ShardStateAction. There were two cases not being handled correctly. - when the local node is shutting down, handlers will be notified with a TransportException with a message starting "transport stopped" - when the remote node disconnects, handlers will be notified with a NodeDisconnectedException In both of these cases, the cause of the exception will be null and this was incorrectly being handled. The first case can passed to the listener like any other critical non-channel failure, and the second case can be handled by modifying the logic for detecting master channel exceptions. There was a third case of NodeNotConnectedException that was not being treated as a master channel exception but should be. This commit adds an integration test that simulates the handling of a shard failure request during a network partition. By isolating the master from the cluster while a shard failed request is in flight, this test simulates that we wait until a new master is elected and then retry sending that shard failed request to the newly elected master. This commit adds methods to CapturingTransport to separate local and remote transport exceptions. The motivation for this change is that local transport exceptions are delivered to listeners (usually, but not always) wrapped in SendRequestTransportException while remote transport exceptions are delivered to listeners wrapped in RemoteTransportException. By making this distinction clear in the CapturingTransport, this makes it less likely that tests will make incorrect assumptions about the exceptions coming out of the transport layer to listeners. Closes #16057
1 parent fc4e8fd commit 296ab3e

File tree

8 files changed

+155
-34
lines changed

8 files changed

+155
-34
lines changed

core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@
4747
import org.elasticsearch.discovery.Discovery;
4848
import org.elasticsearch.node.NodeClosedException;
4949
import org.elasticsearch.threadpool.ThreadPool;
50+
import org.elasticsearch.transport.ConnectTransportException;
5051
import org.elasticsearch.transport.EmptyTransportResponseHandler;
5152
import org.elasticsearch.transport.NodeDisconnectedException;
53+
import org.elasticsearch.transport.NodeNotConnectedException;
5254
import org.elasticsearch.transport.TransportChannel;
5355
import org.elasticsearch.transport.TransportException;
5456
import org.elasticsearch.transport.TransportRequest;
@@ -58,11 +60,8 @@
5860

5961
import java.io.IOException;
6062
import java.util.ArrayList;
61-
import java.util.Arrays;
62-
import java.util.HashSet;
6363
import java.util.List;
6464
import java.util.Locale;
65-
import java.util.Set;
6665

6766
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
6867

@@ -111,8 +110,7 @@ public void handleResponse(TransportResponse.Empty response) {
111110

112111
@Override
113112
public void handleException(TransportException exp) {
114-
assert exp.getCause() != null : exp;
115-
if (isMasterChannelException(exp.getCause())) {
113+
if (isMasterChannelException(exp)) {
116114
waitForNewMasterAndRetry(observer, shardRoutingEntry, listener);
117115
} else {
118116
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), masterNode, shardRoutingEntry);
@@ -123,14 +121,14 @@ public void handleException(TransportException exp) {
123121
}
124122
}
125123

126-
private static Set<Class<?>> MASTER_CHANNEL_EXCEPTIONS =
127-
new HashSet<>(Arrays.asList(
128-
NotMasterException.class,
129-
NodeDisconnectedException.class,
130-
Discovery.FailedToCommitClusterStateException.class
131-
));
132-
private static boolean isMasterChannelException(Throwable cause) {
133-
return MASTER_CHANNEL_EXCEPTIONS.contains(cause.getClass());
124+
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
125+
NotMasterException.class,
126+
ConnectTransportException.class,
127+
Discovery.FailedToCommitClusterStateException.class
128+
};
129+
130+
private static boolean isMasterChannelException(TransportException exp) {
131+
return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null;
134132
}
135133

136134
// visible for testing
@@ -399,4 +397,5 @@ default void onSuccess() {
399397
default void onShardFailedFailure(final Exception e) {
400398
}
401399
}
400+
402401
}

core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public void testResultAggregation() throws ExecutionException, InterruptedExcept
410410
// simulate node failure
411411
totalShards += map.get(entry.getKey()).size();
412412
totalFailedShards += map.get(entry.getKey()).size();
413-
transport.handleResponse(requestId, new Exception());
413+
transport.handleRemoteError(requestId, new Exception());
414414
} else {
415415
List<ShardRouting> shards = map.get(entry.getKey());
416416
List<TransportBroadcastByNodeAction.EmptyResult> shardResults = new ArrayList<>();

core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,14 @@ public void testDelegateToFailingMaster() throws ExecutionException, Interrupted
297297
assertThat(capturedRequest.action, equalTo("testAction"));
298298

299299
if (failsWithConnectTransportException) {
300-
transport.handleResponse(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
300+
transport.handleRemoteError(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
301301
assertFalse(listener.isDone());
302302
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
303303
assertTrue(listener.isDone());
304304
listener.get();
305305
} else {
306306
Throwable t = new Throwable();
307-
transport.handleResponse(capturedRequest.requestId, t);
307+
transport.handleRemoteError(capturedRequest.requestId, t);
308308
assertTrue(listener.isDone());
309309
try {
310310
listener.get();

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ action.new ReplicationPhase(request,
546546
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
547547
}
548548
logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName());
549-
transport.handleResponse(capturedRequest.requestId, t);
549+
transport.handleRemoteError(capturedRequest.requestId, t);
550550
if (criticalFailure) {
551551
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
552552
assertEquals(1, shardFailedRequests.length);
@@ -565,7 +565,7 @@ action.new ReplicationPhase(request,
565565
for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) {
566566
// force a new cluster state to simulate a new master having been elected
567567
clusterService.setState(ClusterState.builder(clusterService.state()));
568-
transport.handleResponse(currentRequest.requestId, new NotMasterException("shard-failed-test"));
568+
transport.handleRemoteError(currentRequest.requestId, new NotMasterException("shard-failed-test"));
569569
CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear();
570570
assertEquals(1, retryRequests.length);
571571
currentRequest = retryRequests[0];
@@ -662,7 +662,7 @@ public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedEx
662662
CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear();
663663
assertThat(replicationRequests.length, equalTo(1));
664664
// try with failure response
665-
transport.handleResponse(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
665+
transport.handleRemoteError(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null));
666666
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
667667
assertEquals(1, shardFailedRequests.length);
668668
transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE);

core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void testFailureWithoutRetry() throws Exception {
198198
long requestId = transport.capturedRequests()[0].requestId;
199199
transport.clear();
200200
// this should not trigger retry or anything and the listener should report exception immediately
201-
transport.handleResponse(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
201+
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
202202

203203
try {
204204
// result should return immediately
@@ -240,7 +240,7 @@ public void testSuccessAfterRetryWithExcpetionFromTransport() throws Exception {
240240
long requestId = transport.capturedRequests()[0].requestId;
241241
transport.clear();
242242
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
243-
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
243+
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
244244
// trigger cluster state observer
245245
clusterService.setState(ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED));
246246
assertThat(transport.capturedRequests().length, equalTo(1));
@@ -258,7 +258,7 @@ public void testRetryOfAnAlreadyTimedOutRequest() throws Exception {
258258
long requestId = transport.capturedRequests()[0].requestId;
259259
transport.clear();
260260
DiscoveryNode node = clusterService.state().getNodes().getLocalNode();
261-
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
261+
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
262262

263263
// wait until the timeout was triggered and we actually tried to send for the second time
264264
assertBusy(new Runnable() {
@@ -270,7 +270,7 @@ public void run() {
270270

271271
// let it fail the second time too
272272
requestId = transport.capturedRequests()[0].requestId;
273-
transport.handleResponse(requestId, new ConnectTransportException(node, "test exception"));
273+
transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception"));
274274
try {
275275
// result should return immediately
276276
assertTrue(listener.isDone());
@@ -313,4 +313,4 @@ protected boolean resolveRequest(ClusterState state, Request request, ActionList
313313
}
314314
}
315315
}
316-
}
316+
}

core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import org.elasticsearch.test.transport.CapturingTransport;
3838
import org.elasticsearch.threadpool.ThreadPool;
3939
import org.elasticsearch.transport.NodeDisconnectedException;
40+
import org.elasticsearch.transport.NodeNotConnectedException;
41+
import org.elasticsearch.transport.RemoteTransportException;
42+
import org.elasticsearch.transport.SendRequestTransportException;
4043
import org.elasticsearch.transport.TransportException;
4144
import org.elasticsearch.transport.TransportResponse;
4245
import org.elasticsearch.transport.TransportService;
@@ -216,11 +219,17 @@ public void testMasterChannelException() throws InterruptedException {
216219
AtomicReference<Exception> exception = new AtomicReference<>();
217220

218221
LongConsumer retryLoop = requestId -> {
219-
List<Exception> possibleExceptions = new ArrayList<>();
220-
possibleExceptions.add(new NotMasterException("simulated"));
221-
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
222-
possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated"));
223-
transport.handleResponse(requestId, randomFrom(possibleExceptions));
222+
if (randomBoolean()) {
223+
transport.handleRemoteError(
224+
requestId,
225+
randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated")));
226+
} else {
227+
if (randomBoolean()) {
228+
transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated"));
229+
} else {
230+
transport.handleError(requestId, new NodeDisconnectedException(null, ShardStateAction.SHARD_FAILED_ACTION_NAME));
231+
}
232+
}
224233
};
225234

226235
final int numberOfRetries = randomIntBetween(1, 256);
@@ -279,7 +288,7 @@ public void onShardFailedFailure(Exception e) {
279288
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
280289
assertThat(capturedRequests.length, equalTo(1));
281290
assertFalse(failure.get());
282-
transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated"));
291+
transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated"));
283292

284293
assertTrue(failure.get());
285294
}

core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.discovery;
2121

22+
import org.apache.lucene.index.CorruptIndexException;
2223
import org.apache.lucene.util.LuceneTestCase;
2324
import org.elasticsearch.ElasticsearchException;
2425
import org.elasticsearch.action.get.GetResponse;
@@ -30,12 +31,15 @@
3031
import org.elasticsearch.cluster.ClusterState;
3132
import org.elasticsearch.cluster.ClusterStateListener;
3233
import org.elasticsearch.cluster.ClusterStateUpdateTask;
34+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3335
import org.elasticsearch.cluster.block.ClusterBlock;
3436
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3537
import org.elasticsearch.cluster.metadata.IndexMetaData;
3638
import org.elasticsearch.cluster.node.DiscoveryNode;
3739
import org.elasticsearch.cluster.node.DiscoveryNodes;
3840
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
41+
import org.elasticsearch.cluster.routing.ShardRouting;
42+
import org.elasticsearch.cluster.routing.ShardRoutingState;
3943
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
4044
import org.elasticsearch.common.Nullable;
4145
import org.elasticsearch.common.Priority;
@@ -96,6 +100,7 @@
96100
import java.util.concurrent.atomic.AtomicBoolean;
97101
import java.util.concurrent.atomic.AtomicInteger;
98102
import java.util.concurrent.atomic.AtomicReference;
103+
import java.util.stream.Collectors;
99104

100105
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
101106
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -883,6 +888,71 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans
883888
internalCluster().stopRandomNonMasterNode();
884889
}
885890

891+
// simulate handling of sending shard failure during an isolation
892+
public void testSendingShardFailure() throws Exception {
893+
List<String> nodes = startCluster(3, 2);
894+
String masterNode = internalCluster().getMasterName();
895+
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
896+
String nonMasterNode = randomFrom(nonMasterNodes);
897+
assertAcked(prepareCreate("test")
898+
.setSettings(Settings.builder()
899+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
900+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
901+
));
902+
ensureGreen();
903+
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
904+
905+
// fail a random shard
906+
ShardRouting failedShard =
907+
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
908+
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
909+
String indexUUID = clusterService().state().metaData().index("test").getIndexUUID();
910+
CountDownLatch latch = new CountDownLatch(1);
911+
AtomicBoolean success = new AtomicBoolean();
912+
913+
String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
914+
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
915+
networkPartition.startDisrupting();
916+
917+
service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
918+
@Override
919+
public void onSuccess() {
920+
success.set(true);
921+
latch.countDown();
922+
}
923+
924+
@Override
925+
public void onShardFailedFailure(Exception e) {
926+
success.set(false);
927+
latch.countDown();
928+
assert false;
929+
}
930+
});
931+
932+
if (isolatedNode.equals(nonMasterNode)) {
933+
assertNoMaster(nonMasterNode);
934+
} else {
935+
ensureStableCluster(2, nonMasterNode);
936+
}
937+
938+
// heal the partition
939+
networkPartition.removeAndEnsureHealthy(internalCluster());
940+
941+
// the cluster should stabilize
942+
ensureStableCluster(3);
943+
944+
latch.await();
945+
946+
// the listener should be notified
947+
assertTrue(success.get());
948+
949+
// the failed shard should be gone
950+
List<ShardRouting> shards = clusterService().state().getRoutingTable().allShards("test");
951+
for (ShardRouting shard : shards) {
952+
assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId())));
953+
}
954+
}
955+
886956
public void testClusterFormingWithASlowNode() throws Exception {
887957
configureUnicastCluster(3, null, 2);
888958

0 commit comments

Comments
 (0)