Skip to content

Commit 146f1d0

Browse files
RELOCATION:Fix Indef. Block when Wait on Refresh
* Fixes the issue reproduced in the added tests: * When having open index requests on a shard that are waiting for a refresh, relocating that shard becomes blocked until that refresh happens (which could be never as in the test scenario). * Fixed by: * Before trying to aquire all permits for relocation, refresh if there are outstanding operations
1 parent 9c2980a commit 146f1d0

File tree

4 files changed

+138
-33
lines changed

4 files changed

+138
-33
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -609,31 +609,35 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
609609
throws IllegalIndexShardStateException, InterruptedException {
610610
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
611611
try {
612-
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
613-
// no shard operation permits are being held here, move state from started to relocated
614-
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
612+
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES,
613+
() -> refresh("relocation requested"),
614+
() -> {
615+
// no shard operation permits are being held here, move state from started to relocated
616+
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
615617
"in-flight operations in progress while moving shard state to relocated";
616-
/*
617-
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
618-
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
619-
*/
620-
verifyRelocatingState();
621-
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff();
622-
try {
623-
consumer.accept(primaryContext);
624-
synchronized (mutex) {
625-
verifyRelocatingState();
626-
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under mutex
627-
}
628-
} catch (final Exception e) {
618+
/*
619+
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context
620+
* via a network operation. Doing this under the mutex can implicitly block the cluster state update thread
621+
* on network operations.
622+
*/
623+
verifyRelocatingState();
624+
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff();
629625
try {
630-
replicationTracker.abortRelocationHandoff();
631-
} catch (final Exception inner) {
632-
e.addSuppressed(inner);
626+
consumer.accept(primaryContext);
627+
synchronized (mutex) {
628+
verifyRelocatingState();
629+
// make changes to primaryMode and relocated flag only under mutex
630+
replicationTracker.completeRelocationHandoff();
631+
}
632+
} catch (final Exception e) {
633+
try {
634+
replicationTracker.abortRelocationHandoff();
635+
} catch (final Exception inner) {
636+
e.addSuppressed(inner);
637+
}
638+
throw e;
633639
}
634-
throw e;
635-
}
636-
});
640+
});
637641
} catch (TimeoutException e) {
638642
logger.warn("timed out waiting for relocation hand-off to complete");
639643
// This is really bad as ongoing replication operations are preventing this shard from completing relocation hand-off.

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,28 @@ public void close() {
9191
* Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
9292
* operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
9393
*
94-
* @param timeout the maximum time to wait for the in-flight operations block
95-
* @param timeUnit the time unit of the {@code timeout} argument
96-
* @param onBlocked the action to run once the block has been acquired
97-
* @param <E> the type of checked exception thrown by {@code onBlocked}
94+
* @param timeout the maximum time to wait for the in-flight operations block
95+
* @param timeUnit the time unit of the {@code timeout} argument
96+
* @param onActiveOperations the action to run before trying to acquire the block if there are active operations
97+
* @param onBlocked the action to run once the block has been acquired
98+
* @param <E> the type of checked exception thrown by {@code onBlocked}
9899
* @throws InterruptedException if calling thread is interrupted
99100
* @throws TimeoutException if timed out waiting for in-flight operations to finish
100101
* @throws IndexShardClosedException if operation permit has been closed
101102
*/
102103
<E extends Exception> void blockOperations(
103104
final long timeout,
104105
final TimeUnit timeUnit,
106+
final CheckedRunnable<E> onActiveOperations,
105107
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
106108
delayOperations();
107-
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
108-
onBlocked.run();
109+
try {
110+
if (getActiveOperationsCount() > 0) {
111+
onActiveOperations.run();
112+
}
113+
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
114+
onBlocked.run();
115+
}
109116
} finally {
110117
releaseDelayedOperations();
111118
}
@@ -211,7 +218,7 @@ private void releaseDelayedOperations() {
211218
/**
212219
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
213220
* {@link ActionListener} will be called on the calling thread. During calls of
214-
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
221+
* {@link #blockOperations(long, TimeUnit, CheckedRunnable, CheckedRunnable)}, permit acquisition can be delayed.
215222
* The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
216223
* longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
217224
* directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure
@@ -295,7 +302,7 @@ private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) thr
295302
/**
296303
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
297304
*
298-
* @return the active operation count, or zero when all permits ar eheld
305+
* @return the active operation count, or zero when all permits are held
299306
*/
300307
int getActiveOperationsCount() {
301308
int availablePermits = semaphore.availablePermits();

server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public void testOperationsIfClosed() {
199199
public void testBlockIfClosed() {
200200
permits.close();
201201
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
202-
() -> { throw new IllegalArgumentException("fake error"); }));
202+
() -> {}, () -> { throw new IllegalArgumentException("fake error"); }));
203203
expectThrows(IndexShardClosedException.class,
204204
() -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
205205
randomInt(10), TimeUnit.MINUTES));
@@ -296,7 +296,7 @@ private Releasable blockAndWait() throws InterruptedException {
296296
IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
297297
threadPool.generic().execute(() -> {
298298
try {
299-
permits.blockOperations(1, TimeUnit.MINUTES, () -> {
299+
permits.blockOperations(1, TimeUnit.MINUTES, () -> {}, () -> {
300300
try {
301301
blockAcquired.countDown();
302302
releaseBlock.await();
@@ -572,7 +572,7 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException {
572572

573573
{
574574
final TimeoutException e =
575-
expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}));
575+
expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}, () -> {}));
576576
assertThat(e, hasToString(containsString("timeout while blocking operations")));
577577
}
578578

server/src/test/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import com.carrotsearch.hppc.procedures.IntProcedure;
2424
import org.apache.lucene.index.IndexFileNames;
2525
import org.apache.lucene.util.English;
26+
import org.elasticsearch.action.ActionFuture;
2627
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
28+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
2729
import org.elasticsearch.action.index.IndexRequestBuilder;
2830
import org.elasticsearch.action.search.SearchResponse;
31+
import org.elasticsearch.action.support.WriteRequest;
2932
import org.elasticsearch.client.Client;
3033
import org.elasticsearch.cluster.ClusterState;
3134
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -506,6 +509,97 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr
506509

507510
}
508511

512+
public void testRelocateWhileWaitingForRefresh() {
513+
logger.info("--> starting [node1] ...");
514+
final String node1 = internalCluster().startNode();
515+
516+
logger.info("--> creating test index ...");
517+
prepareCreate("test", Settings.builder()
518+
.put("index.number_of_shards", 1)
519+
.put("index.number_of_replicas", 0)
520+
.put("index.refresh_interval", -1) // we want to control refreshes
521+
).get();
522+
523+
logger.info("--> index 10 docs");
524+
for (int i = 0; i < 10; i++) {
525+
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
526+
}
527+
logger.info("--> flush so we have an actual index");
528+
client().admin().indices().prepareFlush().execute().actionGet();
529+
logger.info("--> index more docs so we have something in the translog");
530+
for (int i = 10; i < 20; i++) {
531+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
532+
.setSource("field", "value" + i).execute();
533+
}
534+
535+
logger.info("--> start another node");
536+
final String node2 = internalCluster().startNode();
537+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
538+
.setWaitForNodes("2").execute().actionGet();
539+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
540+
541+
logger.info("--> relocate the shard from node1 to node2");
542+
client().admin().cluster().prepareReroute()
543+
.add(new MoveAllocationCommand("test", 0, node1, node2))
544+
.execute().actionGet();
545+
546+
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
547+
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
548+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
549+
550+
logger.info("--> verifying count");
551+
client().admin().indices().prepareRefresh().execute().actionGet();
552+
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
553+
}
554+
555+
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
556+
logger.info("--> starting [node1] ...");
557+
final String node1 = internalCluster().startNode();
558+
559+
logger.info("--> creating test index ...");
560+
prepareCreate("test", Settings.builder()
561+
.put("index.number_of_shards", 1)
562+
.put("index.number_of_replicas", 0)
563+
.put("index.refresh_interval", -1) // we want to control refreshes
564+
).get();
565+
566+
logger.info("--> index 10 docs");
567+
for (int i = 0; i < 10; i++) {
568+
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
569+
}
570+
logger.info("--> flush so we have an actual index");
571+
client().admin().indices().prepareFlush().execute().actionGet();
572+
logger.info("--> index more docs so we have something in the translog");
573+
for (int i = 10; i < 20; i++) {
574+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
575+
.setSource("field", "value" + i).execute();
576+
}
577+
578+
logger.info("--> start another node");
579+
final String node2 = internalCluster().startNode();
580+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
581+
.setWaitForNodes("2").execute().actionGet();
582+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
583+
584+
logger.info("--> relocate the shard from node1 to node2");
585+
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin().cluster().prepareReroute()
586+
.add(new MoveAllocationCommand("test", 0, node1, node2))
587+
.execute();
588+
logger.info("--> index 100 docs while relocating");
589+
for (int i = 20; i < 120; i++) {
590+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
591+
.setSource("field", "value" + i).execute();
592+
}
593+
relocationListener.actionGet();
594+
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
595+
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
596+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
597+
598+
logger.info("--> verifying count");
599+
client().admin().indices().prepareRefresh().execute().actionGet();
600+
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
601+
}
602+
509603
class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {
510604

511605
private final CountDownLatch corruptionCount;

0 commit comments

Comments
 (0)