Skip to content

Commit 3b739b9

Browse files
authored
Avoid NPE on shard changes action (#32630)
If a leader index is deleted while there is an active follower, the follower will send shard changes requests bound for the leader index. Today this will result in a null pointer exception because there will not be an index routing table for the index. A null pointer exception looks like a bug to a user so this commit addresses this by throwing an index not found exception instead.
1 parent 1a39f1d commit 3b739b9

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,9 @@ protected boolean resolveIndex(Request request) {
259259

260260
@Override
261261
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
262-
return state.routingTable()
263-
.index(request.concreteIndex())
264-
.shard(request.request().getShard().id())
262+
return state
263+
.routingTable()
264+
.shardRoutingTable(request.concreteIndex(), request.request().getShard().id())
265265
.activeInitializingShardsRandomIt();
266266
}
267267

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,40 @@
55
*/
66
package org.elasticsearch.xpack.ccr.action;
77

8+
import org.elasticsearch.action.ActionListener;
89
import org.elasticsearch.cluster.routing.ShardRouting;
910
import org.elasticsearch.cluster.routing.ShardRoutingState;
1011
import org.elasticsearch.cluster.routing.TestShardRouting;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.xcontent.XContentType;
14+
import org.elasticsearch.index.Index;
15+
import org.elasticsearch.index.IndexNotFoundException;
1316
import org.elasticsearch.index.IndexService;
1417
import org.elasticsearch.index.shard.IndexShard;
1518
import org.elasticsearch.index.shard.IndexShardNotStartedException;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.index.shard.ShardNotFoundException;
1621
import org.elasticsearch.index.translog.Translog;
1722
import org.elasticsearch.test.ESSingleNodeTestCase;
1823
import org.mockito.Mockito;
1924

2025
import java.util.Arrays;
2126
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicReference;
2229
import java.util.stream.Collectors;
2330
import java.util.stream.LongStream;
2431

2532
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.instanceOf;
2634

2735
public class ShardChangesActionTests extends ESSingleNodeTestCase {
2836

37+
@Override
38+
protected boolean resetNodeAfterTest() {
39+
return true;
40+
}
41+
2942
public void testGetOperations() throws Exception {
3043
final Settings settings = Settings.builder()
3144
.put("index.number_of_shards", 1)
@@ -119,4 +132,52 @@ public void testGetOperationsAlwaysReturnAtLeastOneOp() throws Exception {
119132
assertThat(operations[0].seqNo(), equalTo(0L));
120133
}
121134

135+
public void testIndexNotFound() throws InterruptedException {
136+
final CountDownLatch latch = new CountDownLatch(1);
137+
final AtomicReference<Exception> reference = new AtomicReference<>();
138+
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
139+
transportAction.execute(
140+
new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)),
141+
new ActionListener<ShardChangesAction.Response>() {
142+
@Override
143+
public void onResponse(final ShardChangesAction.Response response) {
144+
fail();
145+
}
146+
147+
@Override
148+
public void onFailure(final Exception e) {
149+
reference.set(e);
150+
latch.countDown();
151+
}
152+
});
153+
latch.await();
154+
assertNotNull(reference.get());
155+
assertThat(reference.get(), instanceOf(IndexNotFoundException.class));
156+
}
157+
158+
public void testShardNotFound() throws InterruptedException {
159+
final int numberOfShards = randomIntBetween(1, 5);
160+
final IndexService indexService = createIndex("index", Settings.builder().put("index.number_of_shards", numberOfShards).build());
161+
final CountDownLatch latch = new CountDownLatch(1);
162+
final AtomicReference<Exception> reference = new AtomicReference<>();
163+
final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class);
164+
transportAction.execute(
165+
new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)),
166+
new ActionListener<ShardChangesAction.Response>() {
167+
@Override
168+
public void onResponse(final ShardChangesAction.Response response) {
169+
fail();
170+
}
171+
172+
@Override
173+
public void onFailure(final Exception e) {
174+
reference.set(e);
175+
latch.countDown();
176+
}
177+
});
178+
latch.await();
179+
assertNotNull(reference.get());
180+
assertThat(reference.get(), instanceOf(ShardNotFoundException.class));
181+
}
182+
122183
}

0 commit comments

Comments
 (0)