diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 7cf4d28d428f5..971f705b3958a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -584,6 +584,7 @@ private void sendBatch( final long maxSeqNoOfUpdatesOrDeletes, final RetentionLeases retentionLeases, final ActionListener listener) throws IOException { + assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 87c7997333e4e..df97d2e548e57 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -786,4 +786,18 @@ public static boolean assertNotScheduleThread(String reason) { "Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]"; return true; } + + public static boolean assertCurrentMethodIsNotCalledRecursively() { + final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + assert stackTraceElements.length >= 3 : stackTraceElements.length; + assert stackTraceElements[0].getMethodName().equals("getStackTrace") : stackTraceElements[0]; + assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively") : stackTraceElements[1]; + final StackTraceElement testingMethod = stackTraceElements[2]; + for (int i = 3; i < stackTraceElements.length; i++) { + assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false + || stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false : + testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively"; + } + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 6f4bf360726e6..163bfd14647ce 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -31,14 +31,13 @@ import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -47,10 +46,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class IndexShardRetentionLeaseTests extends IndexShardTestCase { @@ -58,16 +53,12 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { @Override protected ThreadPool setUpThreadPool() { - final ThreadPool threadPool = mock(ThreadPool.class); - doAnswer(invocationOnMock -> currentTimeMillis.get()).when(threadPool).absoluteTimeInMillis(); - when(threadPool.executor(anyString())).thenReturn(mock(ExecutorService.class)); - when(threadPool.scheduler()).thenReturn(mock(ScheduledExecutorService.class)); - return threadPool; - } - - @Override - protected void tearDownThreadPool() { - + return new TestThreadPool(getClass().getName(), threadPoolSettings()) { + @Override + public long absoluteTimeInMillis() { + return currentTimeMillis.get(); + } + }; } public void testAddOrRenewRetentionLease() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f94a12be0547c..6b04abb356268 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2443,7 +2443,13 @@ public void indexTranslogOperations( maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, - ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); + ActionListener.wrap( + r -> { + assertFalse(replica.isSyncNeeded()); + listener.onResponse(r); + }, + listener::onFailure + )); } }, true, true); @@ -2604,8 +2610,12 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { // we're only checking that listeners are called when the engine is open, before there is no point @Override public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); - assertListenerCalled.accept(replica); + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, + ActionListener.wrap( + r -> { + assertListenerCalled.accept(replica); + listener.onResponse(r); + }, listener::onFailure)); } @Override @@ -2622,15 +2632,21 @@ public void indexTranslogOperations( maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, - ActionListener.map(listener, checkpoint -> { - assertListenerCalled.accept(replica); - return checkpoint; - })); + ActionListener.wrap( + r -> { + assertListenerCalled.accept(replica); + listener.onResponse(r); + }, listener::onFailure)); } @Override public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { - super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); + super.finalizeRecovery(globalCheckpoint, + ActionListener.wrap( + r -> { + assertListenerCalled.accept(replica); + listener.onResponse(r); + }, listener::onFailure)); } }, false, true); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index fb7b79f459720..ff70b05e99ffe 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -240,10 +240,10 @@ public void indexTranslogOperations(List operations, int tot RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); - maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); - } + listener.onResponse(checkpointOnTarget.get()); } }; - RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); + RecoverySourceHandler handler = new RecoverySourceHandler( + shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); @@ -274,14 +274,15 @@ public void testSendSnapshotStopOnError() throws Exception { public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, RetentionLeases retentionLeases, ActionListener listener) { if (randomBoolean()) { - maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); + listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED); } else { - maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index"))); + listener.onFailure(new RuntimeException("test - failed to index")); wasFailed.set(true); } } }; - RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); + RecoverySourceHandler handler = new RecoverySourceHandler( + shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); @@ -761,12 +762,4 @@ public void close() { } }; } - - private void maybeExecuteAsync(Runnable runnable) { - if (randomBoolean()) { - threadPool.generic().execute(runnable); - } else { - runnable.run(); - } - } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 92a61cda9e98d..649b94099cc74 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -20,9 +20,14 @@ package org.elasticsearch.threadpool; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.test.ESTestCase; +import java.util.concurrent.ExecutorService; + import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively; import static org.hamcrest.CoreMatchers.equalTo; public class ThreadPoolTests extends ESTestCase { @@ -67,4 +72,35 @@ public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() { Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings)); assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage()); } + + int factorial(int n) { + assertCurrentMethodIsNotCalledRecursively(); + if (n <= 1) { + return 1; + } else { + return n * factorial(n - 1); + } + } + + int factorialForked(int n, ExecutorService executor) { + assertCurrentMethodIsNotCalledRecursively(); + if (n <= 1) { + return 1; + } + return n * FutureUtils.get(executor.submit(() -> factorialForked(n - 1, executor))); + } + + public void testAssertCurrentMethodIsNotCalledRecursively() { + expectThrows(AssertionError.class, () -> factorial(between(2, 10))); + assertThat(factorial(1), equalTo(1)); // is not called recursively + assertThat(expectThrows(AssertionError.class, () -> factorial(between(2, 10))).getMessage(), + equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorial is called recursively")); + TestThreadPool threadPool = new TestThreadPool("test"); + assertThat(factorialForked(1, threadPool.generic()), equalTo(1)); + assertThat(factorialForked(10, threadPool.generic()), equalTo(3628800)); + assertThat(expectThrows(AssertionError.class, + () -> factorialForked(between(2, 10), EsExecutors.newDirectExecutorService())).getMessage(), + equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorialForked is called recursively")); + terminate(threadPool); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 8d73a5ba4e467..0ba60ba872e9a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -67,6 +67,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.recovery.AsyncRecoveryTarget; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryResponse; @@ -629,8 +630,9 @@ protected final void recoverUnstartedReplica(final IndexShard replica, final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); - final RecoverySourceHandler recovery = new RecoverySourceHandler( - primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); + final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, + new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet()); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java new file mode 100644 index 0000000000000..e845ab41b8af8 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executor; + +/** + * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. + */ +public class AsyncRecoveryTarget implements RecoveryTargetHandler { + private final RecoveryTargetHandler target; + private final Executor executor; + + public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { + this.executor = executor; + this.target = target; + } + + @Override + public void ensureClusterStateVersion(long clusterStateVersion) { + target.ensureClusterStateVersion(clusterStateVersion); + } + + @Override + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener)); + } + + @Override + public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { + executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener)); + } + + @Override + public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) { + target.handoffPrimaryContext(primaryContext); + } + + @Override + public void indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary, + RetentionLeases retentionLeases, ActionListener listener) { + executor.execute(() -> target.indexTranslogOperations( + operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, listener)); + } + + @Override + public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, + List phase1ExistingFileSizes, int totalTranslogOps) { + target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps); + } + + @Override + public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { + target.cleanFiles(totalTranslogOps, sourceMetaData); + } + + @Override + public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, + boolean lastChunk, int totalTranslogOps, ActionListener listener) { + // TODO: remove this clone once we send file chunk async + final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef())); + executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener)); + } +}