diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 78f44ee723114..4c543aeeb22d4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,10 +29,9 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -44,6 +43,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -67,6 +68,7 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -142,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); + }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { final long startingSeqNo; @@ -196,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException { * all documents up to maxSeqNo in phase2. */ runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId()); + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* @@ -227,17 +229,41 @@ private boolean isTargetSameHistory() { return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID()); } - private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) { + static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason, + IndexShard primary, CancellableThreads cancellableThreads, Logger logger) { cancellableThreads.execute(() -> { - final PlainActionFuture onAcquired = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); - try (Releasable ignored = onAcquired.actionGet()) { + CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + }; + primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); + try (Releasable ignored = FutureUtils.get(permit)) { // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() - if (shard.isPrimaryMode() == false) { - throw new IndexShardRelocatedException(shard.shardId()); + if (primary.isPrimaryMode() == false) { + throw new IndexShardRelocatedException(primary.shardId()); } runnable.run(); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } + }); } }); } @@ -489,11 +515,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio * the permit then the state of the shard will be relocated and this recovery will fail. */ runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), - shardId + " marking " + request.targetAllocationId() + " as in sync"); + shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); final long globalCheckpoint = shard.getGlobalCheckpoint(); cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint"); + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index babf8518d4492..5ade55ef5340c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -439,6 +440,30 @@ long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, assertFalse(phase2Called.get()); } + public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final IndexShard shard = mock(IndexShard.class); + final AtomicBoolean freed = new AtomicBoolean(true); + when(shard.isPrimaryMode()).thenReturn(true); + doAnswer(invocation -> { + freed.set(false); + ((ActionListener)invocation.getArguments()[0]).onResponse(() -> freed.set(true)); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + + Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); + cancelingThread.start(); + try { + RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger); + } catch (CancellableThreads.ExecutionCancelledException e) { + // expected. + } + cancelingThread.join(); + // we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check + // that the permit is released. + assertBusy(() -> assertTrue(freed.get())); + } + private Store newStore(Path path) throws IOException { return newStore(path, true); }