Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ private void sendBatch(
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final ActionListener<Long> listener) throws IOException {
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -786,4 +786,18 @@ public static boolean assertNotScheduleThread(String reason) {
"Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]";
return true;
}

public static boolean assertCurrentMethodIsNotCalledRecursively() {
final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
assert stackTraceElements.length >= 3 : stackTraceElements.length;
assert stackTraceElements[0].getMethodName().equals("getStackTrace") : stackTraceElements[0];
assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively") : stackTraceElements[1];
final StackTraceElement testingMethod = stackTraceElements[2];
for (int i = 3; i < stackTraceElements.length; i++) {
assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false
|| stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false :
testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively";
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -47,27 +46,19 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class IndexShardRetentionLeaseTests extends IndexShardTestCase {

private final AtomicLong currentTimeMillis = new AtomicLong();

@Override
protected ThreadPool setUpThreadPool() {
final ThreadPool threadPool = mock(ThreadPool.class);
doAnswer(invocationOnMock -> currentTimeMillis.get()).when(threadPool).absoluteTimeInMillis();
when(threadPool.executor(anyString())).thenReturn(mock(ExecutorService.class));
when(threadPool.scheduler()).thenReturn(mock(ScheduledExecutorService.class));
return threadPool;
}

@Override
protected void tearDownThreadPool() {

return new TestThreadPool(getClass().getName(), threadPoolSettings()) {
@Override
public long absoluteTimeInMillis() {
return currentTimeMillis.get();
}
};
}

public void testAddOrRenewRetentionLease() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,13 @@ public void indexTranslogOperations(
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
ActionListener.wrap(
r -> {
assertFalse(replica.isSyncNeeded());
listener.onResponse(r);
},
listener::onFailure
));
}
}, true, true);

Expand Down Expand Up @@ -2604,8 +2610,12 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertListenerCalled.accept(replica);
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
listener.onResponse(r);
}, listener::onFailure));
}

@Override
Expand All @@ -2622,15 +2632,21 @@ public void indexTranslogOperations(
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.map(listener, checkpoint -> {
assertListenerCalled.accept(replica);
return checkpoint;
}));
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
listener.onResponse(r);
}, listener::onFailure));
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
super.finalizeRecovery(globalCheckpoint,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
listener.onResponse(r);
}, listener::onFailure));
}
}, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
RetentionLeases retentionLeases, ActionListener<Long> listener) {
shippedOps.addAll(operations);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
}
listener.onResponse(checkpointOnTarget.get()); }
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler handler = new RecoverySourceHandler(
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
Expand Down Expand Up @@ -274,14 +274,15 @@ public void testSendSnapshotStopOnError() throws Exception {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
long msu, RetentionLeases retentionLeases, ActionListener<Long> listener) {
if (randomBoolean()) {
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
} else {
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
listener.onFailure(new RuntimeException("test - failed to index"));
wasFailed.set(true);
}
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler handler = new RecoverySourceHandler(
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
Expand Down Expand Up @@ -761,12 +762,4 @@ public void close() {
}
};
}

private void maybeExecuteAsync(Runnable runnable) {
if (randomBoolean()) {
threadPool.generic().execute(runnable);
} else {
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.ExecutorService;

import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively;
import static org.hamcrest.CoreMatchers.equalTo;

public class ThreadPoolTests extends ESTestCase {
Expand Down Expand Up @@ -67,4 +72,35 @@ public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() {
Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings));
assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage());
}

int factorial(int n) {
assertCurrentMethodIsNotCalledRecursively();
if (n <= 1) {
return 1;
} else {
return n * factorial(n - 1);
}
}

int factorialForked(int n, ExecutorService executor) {
assertCurrentMethodIsNotCalledRecursively();
if (n <= 1) {
return 1;
}
return n * FutureUtils.get(executor.submit(() -> factorialForked(n - 1, executor)));
}

public void testAssertCurrentMethodIsNotCalledRecursively() {
expectThrows(AssertionError.class, () -> factorial(between(2, 10)));
assertThat(factorial(1), equalTo(1)); // is not called recursively
assertThat(expectThrows(AssertionError.class, () -> factorial(between(2, 10))).getMessage(),
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorial is called recursively"));
TestThreadPool threadPool = new TestThreadPool("test");
assertThat(factorialForked(1, threadPool.generic()), equalTo(1));
assertThat(factorialForked(10, threadPool.generic()), equalTo(3628800));
assertThat(expectThrows(AssertionError.class,
() -> factorialForked(between(2, 10), EsExecutors.newDirectExecutorService())).getMessage(),
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorialForked is called recursively"));
terminate(threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.recovery.AsyncRecoveryTarget;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryResponse;
Expand Down Expand Up @@ -629,8 +630,9 @@ protected final void recoverUnstartedReplica(final IndexShard replica,

final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;

/**
* Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}.
*/
public class AsyncRecoveryTarget implements RecoveryTargetHandler {
private final RecoveryTargetHandler target;
private final Executor executor;

public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) {
this.executor = executor;
this.target = target;
}

@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
target.ensureClusterStateVersion(clusterStateVersion);
}

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener));
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener));
}

@Override
public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {
target.handoffPrimaryContext(primaryContext);
}

@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary,
RetentionLeases retentionLeases, ActionListener<Long> listener) {
executor.execute(() -> target.indexTranslogOperations(
operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, listener));
}

@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
}

@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, sourceMetaData);
}

@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
// TODO: remove this clone once we send file chunk async
final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef()));
executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener));
}
}