From 1ef4fb4c054dd24c7b1c0398927e10883eec5e1d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 2 May 2018 00:00:59 +0200 Subject: [PATCH 1/2] cancelling shouldn't leak permits --- .../recovery/RecoverySourceHandler.java | 47 ++++++++++++++----- .../recovery/RecoverySourceHandlerTests.java | 25 ++++++++++ 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 78f44ee723114..d0d6162fa6d63 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,10 +29,9 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -44,6 +43,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -67,6 +68,7 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -142,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); + }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { final long startingSeqNo; @@ -196,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException { * all documents up to maxSeqNo in phase2. */ runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId()); + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* @@ -227,17 +229,38 @@ private boolean isTargetSameHistory() { return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID()); } - private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) { + static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason, + IndexShard primary, CancellableThreads cancellableThreads) { cancellableThreads.execute(() -> { - final PlainActionFuture onAcquired = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); - try (Releasable ignored = onAcquired.actionGet()) { + CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + }; + primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); + try (Releasable ignored = FutureUtils.get(permit)) { // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() - if (shard.isPrimaryMode() == false) { - throw new IndexShardRelocatedException(shard.shardId()); + if (primary.isPrimaryMode() == false) { + throw new IndexShardRelocatedException(primary.shardId()); } runnable.run(); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + }); } }); } @@ -489,11 +512,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio * the permit then the state of the shard will be relocated and this recovery will fail. */ runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), - shardId + " marking " + request.targetAllocationId() + " as in sync"); + shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads); final long globalCheckpoint = shard.getGlobalCheckpoint(); cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint"); + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index babf8518d4492..4c52ba219f4b3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -439,6 +440,30 @@ long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, assertFalse(phase2Called.get()); } + public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final IndexShard shard = mock(IndexShard.class); + final AtomicBoolean freed = new AtomicBoolean(true); + when(shard.isPrimaryMode()).thenReturn(true); + doAnswer(invocation -> { + freed.set(false); + ((ActionListener)invocation.getArguments()[0]).onResponse(() -> freed.set(true)); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + + Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); + cancelingThread.start(); + try { + RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads); + } catch (CancellableThreads.ExecutionCancelledException e) { + // expected. + } + cancelingThread.join(); + // we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check + // that the permit is released. + assertBusy(() -> assertTrue(freed.get())); + } + private Store newStore(Path path) throws IOException { return newStore(path, true); } From ae229383a7d612f8f5e1a5b970cf77b88ecc985c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 2 May 2018 13:46:29 +0200 Subject: [PATCH 2/2] addd trace logging on exception --- .../indices/recovery/RecoverySourceHandler.java | 13 ++++++++----- .../recovery/RecoverySourceHandlerTests.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d0d6162fa6d63..4c543aeeb22d4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -144,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads); + }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { final long startingSeqNo; @@ -198,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException { * all documents up to maxSeqNo in phase2. */ runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads); + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* @@ -230,7 +230,7 @@ private boolean isTargetSameHistory() { } static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason, - IndexShard primary, CancellableThreads cancellableThreads) { + IndexShard primary, CancellableThreads cancellableThreads, Logger logger) { cancellableThreads.execute(() -> { CompletableFuture permit = new CompletableFuture<>(); final ActionListener onAcquired = new ActionListener() { @@ -260,6 +260,9 @@ public void onFailure(Exception e) { if (r != null) { r.close(); } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } }); } }); @@ -512,11 +515,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio * the permit then the state of the shard will be relocated and this recovery will fail. */ runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), - shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads); + shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); final long globalCheckpoint = shard.getGlobalCheckpoint(); cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads); + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 4c52ba219f4b3..5ade55ef5340c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -454,7 +454,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); cancelingThread.start(); try { - RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads); + RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger); } catch (CancellableThreads.ExecutionCancelledException e) { // expected. }