Skip to content

Commit f40c6a6

Browse files
CR: different fix
1 parent 146f1d0 commit f40c6a6

File tree

3 files changed

+45
-44
lines changed

3 files changed

+45
-44
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ Runnable getGlobalCheckpointSyncer() {
244244
*/
245245
private final RefreshListeners refreshListeners;
246246

247+
/**
248+
* Prevents new refresh listeners from being registered. Used to prevent becoming blocked on operations waiting for refresh
249+
* during relocation.
250+
*/
251+
private final AtomicBoolean preventNewRefreshListeners = new AtomicBoolean(false);
252+
247253
private final AtomicLong lastSearcherAccess = new AtomicLong();
248254
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
249255

@@ -608,42 +614,44 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
608614
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
609615
throws IllegalIndexShardStateException, InterruptedException {
610616
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
617+
preventNewRefreshListeners.set(true);
611618
try {
612-
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES,
613-
() -> refresh("relocation requested"),
614-
() -> {
615-
// no shard operation permits are being held here, move state from started to relocated
616-
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
619+
if (refreshListeners.refreshNeeded()) {
620+
refresh("relocated");
621+
}
622+
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
623+
// no shard operation permits are being held here, move state from started to relocated
624+
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
617625
"in-flight operations in progress while moving shard state to relocated";
618-
/*
619-
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context
620-
* via a network operation. Doing this under the mutex can implicitly block the cluster state update thread
621-
* on network operations.
622-
*/
623-
verifyRelocatingState();
624-
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff();
626+
/*
627+
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
628+
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
629+
*/
630+
verifyRelocatingState();
631+
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff();
632+
try {
633+
consumer.accept(primaryContext);
634+
synchronized (mutex) {
635+
verifyRelocatingState();
636+
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under mutex
637+
}
638+
} catch (final Exception e) {
625639
try {
626-
consumer.accept(primaryContext);
627-
synchronized (mutex) {
628-
verifyRelocatingState();
629-
// make changes to primaryMode and relocated flag only under mutex
630-
replicationTracker.completeRelocationHandoff();
631-
}
632-
} catch (final Exception e) {
633-
try {
634-
replicationTracker.abortRelocationHandoff();
635-
} catch (final Exception inner) {
636-
e.addSuppressed(inner);
637-
}
638-
throw e;
640+
replicationTracker.abortRelocationHandoff();
641+
} catch (final Exception inner) {
642+
e.addSuppressed(inner);
639643
}
640-
});
644+
throw e;
645+
}
646+
});
641647
} catch (TimeoutException e) {
642648
logger.warn("timed out waiting for relocation hand-off to complete");
643649
// This is really bad as ongoing replication operations are preventing this shard from completing relocation hand-off.
644650
// Fail primary relocation source and target shards.
645651
failShard("timed out waiting for relocation hand-off to complete", null);
646652
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
653+
} finally {
654+
preventNewRefreshListeners.set(false);
647655
}
648656
}
649657

@@ -2667,7 +2675,7 @@ public void onAfter() {
26672675
*/
26682676
private RefreshListeners buildRefreshListeners() {
26692677
return new RefreshListeners(
2670-
indexSettings::getMaxRefreshListeners,
2678+
() -> preventNewRefreshListeners.get() ? 0 : indexSettings.getMaxRefreshListeners(),
26712679
() -> refresh("too_many_listeners"),
26722680
threadPool.executor(ThreadPool.Names.LISTENER)::execute,
26732681
logger, threadPool.getThreadContext());

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,28 +91,21 @@ public void close() {
9191
* Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
9292
* operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
9393
*
94-
* @param timeout the maximum time to wait for the in-flight operations block
95-
* @param timeUnit the time unit of the {@code timeout} argument
96-
* @param onActiveOperations the action to run before trying to acquire the block if there are active operations
97-
* @param onBlocked the action to run once the block has been acquired
98-
* @param <E> the type of checked exception thrown by {@code onBlocked}
94+
* @param timeout the maximum time to wait for the in-flight operations block
95+
* @param timeUnit the time unit of the {@code timeout} argument
96+
* @param onBlocked the action to run once the block has been acquired
97+
* @param <E> the type of checked exception thrown by {@code onBlocked}
9998
* @throws InterruptedException if calling thread is interrupted
10099
* @throws TimeoutException if timed out waiting for in-flight operations to finish
101100
* @throws IndexShardClosedException if operation permit has been closed
102101
*/
103102
<E extends Exception> void blockOperations(
104103
final long timeout,
105104
final TimeUnit timeUnit,
106-
final CheckedRunnable<E> onActiveOperations,
107105
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
108106
delayOperations();
109-
try {
110-
if (getActiveOperationsCount() > 0) {
111-
onActiveOperations.run();
112-
}
113-
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
114-
onBlocked.run();
115-
}
107+
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
108+
onBlocked.run();
116109
} finally {
117110
releaseDelayedOperations();
118111
}
@@ -218,7 +211,7 @@ private void releaseDelayedOperations() {
218211
/**
219212
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
220213
* {@link ActionListener} will be called on the calling thread. During calls of
221-
* {@link #blockOperations(long, TimeUnit, CheckedRunnable, CheckedRunnable)}, permit acquisition can be delayed.
214+
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
222215
* The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
223216
* longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
224217
* directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure

server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public void testOperationsIfClosed() {
199199
public void testBlockIfClosed() {
200200
permits.close();
201201
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
202-
() -> {}, () -> { throw new IllegalArgumentException("fake error"); }));
202+
() -> { throw new IllegalArgumentException("fake error"); }));
203203
expectThrows(IndexShardClosedException.class,
204204
() -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
205205
randomInt(10), TimeUnit.MINUTES));
@@ -296,7 +296,7 @@ private Releasable blockAndWait() throws InterruptedException {
296296
IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
297297
threadPool.generic().execute(() -> {
298298
try {
299-
permits.blockOperations(1, TimeUnit.MINUTES, () -> {}, () -> {
299+
permits.blockOperations(1, TimeUnit.MINUTES, () -> {
300300
try {
301301
blockAcquired.countDown();
302302
releaseBlock.await();
@@ -572,7 +572,7 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException {
572572

573573
{
574574
final TimeoutException e =
575-
expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}, () -> {}));
575+
expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}));
576576
assertThat(e, hasToString(containsString("timeout while blocking operations")));
577577
}
578578

0 commit comments

Comments
 (0)