Skip to content

Commit 66f0c89

Browse files
Fix Transport Stopped Exception (#48930) (#49035)
When a node shuts down, `TransportService` moves to stopped state and then closes connections. If a request is done in between, an exception was thrown that was not retried in replication actions. Now throw a wrapped `NodeClosedException` exception instead, which is correctly handled in replication action. Fixed other usages too. Relates #42612
1 parent fb685ad commit 66f0c89

File tree

9 files changed

+260
-32
lines changed

9 files changed

+260
-32
lines changed

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3030
import org.elasticsearch.index.Index;
3131
import org.elasticsearch.rest.RestStatus;
32-
import org.elasticsearch.transport.TransportException;
3332

3433
import java.io.IOException;
3534
import java.io.PrintWriter;
@@ -222,14 +221,6 @@ public static Throwable unwrap(Throwable t, Class<?>... clazzes) {
222221
return null;
223222
}
224223

225-
public static boolean isTransportStoppedForAction(final Throwable t, final String action) {
226-
final TransportException maybeTransport =
227-
(TransportException) ExceptionsHelper.unwrap(t, TransportException.class);
228-
return maybeTransport != null
229-
&& (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request")
230-
|| maybeTransport.getMessage().equals("transport stopped, action: " + action));
231-
}
232-
233224
/**
234225
* Throws the specified exception. If null if specified then <code>true</code> is returned.
235226
*/

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,7 @@ public String toString() {
213213

214214
private void onNoLongerPrimary(Exception failure) {
215215
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
216-
final boolean nodeIsClosing =
217-
cause instanceof NodeClosedException
218-
|| ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure");
216+
final boolean nodeIsClosing = cause instanceof NodeClosedException;
219217
final String message;
220218
if (nodeIsClosing) {
221219
message = String.format(Locale.ROOT,

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.shard.IndexShardClosedException;
4444
import org.elasticsearch.index.shard.ShardId;
4545
import org.elasticsearch.indices.IndicesService;
46+
import org.elasticsearch.node.NodeClosedException;
4647
import org.elasticsearch.threadpool.ThreadPool;
4748
import org.elasticsearch.transport.TransportService;
4849

@@ -113,8 +114,8 @@ public void backgroundSync(
113114
ActionListener.wrap(
114115
r -> {},
115116
e -> {
116-
if (ExceptionsHelper.isTransportStoppedForAction(e, ACTION_NAME + "[p]")) {
117-
// we are likely shutting down
117+
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
118+
// node shutting down
118119
return;
119120
}
120121
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4848
import org.elasticsearch.common.util.concurrent.ThreadContext;
4949
import org.elasticsearch.core.internal.io.IOUtils;
50+
import org.elasticsearch.node.NodeClosedException;
5051
import org.elasticsearch.tasks.Task;
5152
import org.elasticsearch.tasks.TaskCancelledException;
5253
import org.elasticsearch.tasks.TaskManager;
@@ -275,8 +276,8 @@ public void onFailure(Exception e) {
275276
}
276277
@Override
277278
public void doRun() {
278-
// cf. ExceptionsHelper#isTransportStoppedForAction
279-
TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
279+
TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
280+
holderToNotify.action(), new NodeClosedException(localNode));
280281
holderToNotify.handler().handleException(ex);
281282
}
282283
});
@@ -680,11 +681,8 @@ private <T extends TransportResponse> void sendRequestInternal(final Transport.C
680681
/*
681682
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
682683
* caller. It will only notify if toStop hasn't done the work yet.
683-
*
684-
* Do not edit this exception message, it is currently relied upon in production code!
685684
*/
686-
// TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
687-
throw new TransportException("TransportService is closed stopped can't send request");
685+
throw new NodeClosedException(localNode);
688686
}
689687
if (timeoutHandler != null) {
690688
assert options.timeout() != null;

server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.node.NodeClosedException;
4545
import org.elasticsearch.test.ESTestCase;
4646
import org.elasticsearch.transport.SendRequestTransportException;
47-
import org.elasticsearch.transport.TransportException;
4847

4948
import java.util.ArrayList;
5049
import java.util.Collections;
@@ -205,12 +204,9 @@ public void testNoLongerPrimary() throws Exception {
205204
if (randomBoolean()) {
206205
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
207206
} else if (randomBoolean()) {
207+
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
208208
shardActionFailure = new SendRequestTransportException(
209-
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME,
210-
new TransportException("TransportService is closed stopped can't send request"));
211-
} else if (randomBoolean()) {
212-
shardActionFailure = new TransportException(
213-
"transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME);
209+
node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node));
214210
} else {
215211
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
216212
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.support.replication;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.ActionRequest;
24+
import org.elasticsearch.action.ActionResponse;
25+
import org.elasticsearch.action.ActionType;
26+
import org.elasticsearch.action.support.ActionFilters;
27+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
28+
import org.elasticsearch.cluster.metadata.IndexMetaData;
29+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
30+
import org.elasticsearch.cluster.service.ClusterService;
31+
import org.elasticsearch.common.inject.Inject;
32+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
33+
import org.elasticsearch.common.io.stream.StreamInput;
34+
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.util.concurrent.ThreadContext;
36+
import org.elasticsearch.index.shard.IndexShard;
37+
import org.elasticsearch.index.shard.ShardId;
38+
import org.elasticsearch.indices.IndicesService;
39+
import org.elasticsearch.plugins.ActionPlugin;
40+
import org.elasticsearch.plugins.NetworkPlugin;
41+
import org.elasticsearch.plugins.Plugin;
42+
import org.elasticsearch.plugins.PluginsService;
43+
import org.elasticsearch.test.ESIntegTestCase;
44+
import org.elasticsearch.test.InternalTestCluster;
45+
import org.elasticsearch.test.transport.MockTransportService;
46+
import org.elasticsearch.threadpool.ThreadPool;
47+
import org.elasticsearch.transport.Transport;
48+
import org.elasticsearch.transport.TransportInterceptor;
49+
import org.elasticsearch.transport.TransportRequest;
50+
import org.elasticsearch.transport.TransportRequestOptions;
51+
import org.elasticsearch.transport.TransportResponse;
52+
import org.elasticsearch.transport.TransportResponseHandler;
53+
import org.elasticsearch.transport.TransportService;
54+
import org.hamcrest.Matchers;
55+
56+
import java.io.IOException;
57+
import java.util.Arrays;
58+
import java.util.Collection;
59+
import java.util.List;
60+
import java.util.concurrent.CountDownLatch;
61+
import java.util.concurrent.TimeUnit;
62+
import java.util.concurrent.atomic.AtomicReference;
63+
64+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
65+
66+
67+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
68+
public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase {
69+
70+
@Override
71+
protected Collection<Class<? extends Plugin>> nodePlugins() {
72+
return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class);
73+
}
74+
75+
@Override
76+
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
77+
return Arrays.asList(TestPlugin.class);
78+
}
79+
80+
public static class Request extends ReplicationRequest<Request> {
81+
public Request(ShardId shardId) {
82+
super(shardId);
83+
}
84+
85+
public Request(StreamInput in) throws IOException {
86+
super(in);
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "test-request";
92+
}
93+
}
94+
95+
public static class Response extends ReplicationResponse {
96+
public Response() {
97+
}
98+
99+
public Response(StreamInput in) throws IOException {
100+
super(in);
101+
}
102+
}
103+
104+
public static class TestAction extends TransportReplicationAction<Request, Request, Response> {
105+
private static final String ACTION_NAME = "internal:test-replication-action";
106+
private static final ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);
107+
108+
@Inject
109+
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
110+
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
111+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
112+
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
113+
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
114+
}
115+
116+
@Override
117+
protected Response newResponseInstance(StreamInput in) throws IOException {
118+
return new Response(in);
119+
}
120+
121+
@Override
122+
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
123+
ActionListener<PrimaryResult<Request, Response>> listener) {
124+
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
125+
}
126+
127+
@Override
128+
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
129+
return new ReplicaResult();
130+
}
131+
}
132+
133+
public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
134+
private CountDownLatch actionRunningLatch = new CountDownLatch(1);
135+
private CountDownLatch actionWaitLatch = new CountDownLatch(1);
136+
private volatile String testActionName;
137+
138+
public TestPlugin() {
139+
}
140+
141+
@Override
142+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
143+
return Arrays.asList(new ActionHandler<>(TestAction.TYPE, TestAction.class));
144+
}
145+
146+
@Override
147+
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
148+
ThreadContext threadContext) {
149+
return Arrays.asList(new TransportInterceptor() {
150+
@Override
151+
public AsyncSender interceptSender(AsyncSender sender) {
152+
return new AsyncSender() {
153+
@Override
154+
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
155+
TransportRequest request, TransportRequestOptions options,
156+
TransportResponseHandler<T> handler) {
157+
// only activated on primary
158+
if (action.equals(testActionName)) {
159+
actionRunningLatch.countDown();
160+
try {
161+
actionWaitLatch.await(10, TimeUnit.SECONDS);
162+
} catch (InterruptedException e) {
163+
throw new AssertionError(e);
164+
}
165+
}
166+
sender.sendRequest(connection, action, request, options, handler);
167+
}
168+
};
169+
}
170+
});
171+
}
172+
}
173+
174+
public void testRetryOnStoppedTransportService() throws Exception {
175+
internalCluster().startMasterOnlyNodes(2);
176+
String primary = internalCluster().startDataOnlyNode();
177+
assertAcked(prepareCreate("test")
178+
.setSettings(Settings.builder()
179+
.put(indexSettings())
180+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
181+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
182+
));
183+
184+
String replica = internalCluster().startDataOnlyNode();
185+
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
186+
ensureGreen("test");
187+
188+
TestPlugin primaryTestPlugin = getTestPlugin(primary);
189+
// this test only provoked an issue for the primary action, but for completeness, we pick the action randomly
190+
primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]");
191+
logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica);
192+
193+
AtomicReference<Object> response = new AtomicReference<>();
194+
CountDownLatch doneLatch = new CountDownLatch(1);
195+
client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)),
196+
ActionListener.runAfter(ActionListener.wrap(
197+
r -> assertTrue(response.compareAndSet(null, r)),
198+
e -> assertTrue(response.compareAndSet(null, e))),
199+
doneLatch::countDown));
200+
201+
assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS));
202+
203+
MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
204+
primary);
205+
// we pause node after TransportService has moved to stopped, but before closing connections, since if connections are closed
206+
// we would not hit the transport service closed case.
207+
primaryTransportService.addOnStopListener(() -> {
208+
primaryTestPlugin.actionWaitLatch.countDown();
209+
try {
210+
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
211+
} catch (InterruptedException e) {
212+
throw new AssertionError(e);
213+
}
214+
});
215+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
216+
217+
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
218+
if (response.get() instanceof Exception) {
219+
throw new AssertionError(response.get());
220+
}
221+
}
222+
223+
private TestPlugin getTestPlugin(String node) {
224+
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, node);
225+
List<TestPlugin> testPlugins = pluginsService.filterPlugins(TestPlugin.class);
226+
assertThat(testPlugins, Matchers.hasSize(1));
227+
return testPlugins.get(0);
228+
}
229+
}

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2929
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3030
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31+
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.cluster.service.ClusterService;
3233
import org.elasticsearch.common.settings.Settings;
3334
import org.elasticsearch.core.internal.io.IOUtils;
@@ -38,11 +39,13 @@
3839
import org.elasticsearch.index.shard.IndexShardClosedException;
3940
import org.elasticsearch.index.shard.ShardId;
4041
import org.elasticsearch.indices.IndicesService;
42+
import org.elasticsearch.node.NodeClosedException;
4143
import org.elasticsearch.tasks.Task;
4244
import org.elasticsearch.test.ESTestCase;
4345
import org.elasticsearch.test.transport.CapturingTransport;
4446
import org.elasticsearch.threadpool.TestThreadPool;
4547
import org.elasticsearch.threadpool.ThreadPool;
48+
import org.elasticsearch.transport.SendRequestTransportException;
4649
import org.elasticsearch.transport.TransportException;
4750
import org.elasticsearch.transport.TransportService;
4851
import org.mockito.ArgumentCaptor;
@@ -212,10 +215,11 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
212215
final Exception e = randomFrom(
213216
new AlreadyClosedException("closed"),
214217
new IndexShardClosedException(indexShard.shardId()),
215-
new TransportException(randomFrom(
216-
"failed",
217-
"TransportService is closed stopped can't send request",
218-
"transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")),
218+
new TransportException("failed"),
219+
new SendRequestTransportException(null, randomFrom(
220+
"some-action",
221+
"indices:admin/seq_no/retention_lease_background_sync[p]"
222+
), new NodeClosedException((DiscoveryNode) null)),
219223
new RuntimeException("failed"));
220224
listener.onFailure(e);
221225
if (e.getMessage().equals("failed")) {

0 commit comments

Comments
 (0)