diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java
new file mode 100644
index 0000000000000..09366a38a9957
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.Assertions;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.index.seqno.LocalCheckpointTracker;
+import org.elasticsearch.index.store.StoreFileMetaData;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
+
+/**
+ * File chunks are sent/requested sequentially by at most one thread at any time. However, the sender/requestor won't wait for the response
+ * before processing the next file chunk request to reduce the recovery time especially on secure/compressed or high latency communication.
+ *
+ * The sender/requestor can send up to {@code maxConcurrentFileChunks} file chunk requests without waiting for responses. Since the recovery
+ * target can receive file chunks out of order, it has to buffer those file chunks in memory and only flush to disk when there's no gap.
+ * To ensure the recover target never buffers more than {@code maxConcurrentFileChunks} file chunks, we allow the sender/requestor to send
+ * only up to {@code maxConcurrentFileChunks} file chunk requests from the last flushed (and acknowledged) file chunk. We leverage the local
+ * checkpoint tracker for this purpose. We generate a new sequence number and assign it to each file chunk request before sending; then mark
+ * that sequence number as processed when we receive a response for the corresponding file chunk request. With the local checkpoint tracker,
+ * we know the last acknowledged-flushed file-chunk is a file chunk whose {@code requestSeqId} equals to the local checkpoint because the
+ * recover target can flush all file chunks up to the local checkpoint.
+ *
+ * When the number of un-replied file chunk requests reaches the limit (i.e. the gap between the max_seq_no and the local checkpoint is
+ * greater than {@code maxConcurrentFileChunks}), the sending/requesting thread will abort its execution. That process will be resumed by
+ * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
+ * until all chunk requests are sent/responded.
+ */
+abstract class MultiFileTransfer implements Closeable {
+ private Status status = Status.PROCESSING;
+ private final Logger logger;
+ private final ActionListener listener;
+ private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
+ private final AsyncIOProcessor processor;
+ private final int maxConcurrentFileChunks;
+ private StoreFileMetaData currentFile = null;
+ private final Iterator remainingFiles;
+ private Tuple readAheadRequest = null;
+
+ protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener listener,
+ int maxConcurrentFileChunks, List files) {
+ this.logger = logger;
+ this.maxConcurrentFileChunks = maxConcurrentFileChunks;
+ this.listener = listener;
+ this.processor = new AsyncIOProcessor<>(logger, maxConcurrentFileChunks, threadContext) {
+ @Override
+ protected void write(List>> items) {
+ handleItems(items);
+ }
+ };
+ this.remainingFiles = files.iterator();
+ }
+
+ public final void start() {
+ addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
+ }
+
+ private void addItem(long requestSeqId, StoreFileMetaData md, Exception failure) {
+ processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; });
+ }
+
+ private void handleItems(List>> items) {
+ if (status != Status.PROCESSING) {
+ assert status == Status.FAILED : "must not receive any response after the transfer was completed";
+ // These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
+ items.stream().filter(item -> item.v1().failure != null).forEach(item ->
+ logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure));
+ return;
+ }
+ try {
+ for (Tuple> item : items) {
+ final FileChunkResponseItem resp = item.v1();
+ if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
+ continue; // not an actual item
+ }
+ requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
+ if (resp.failure != null) {
+ handleError(resp.md, resp.failure);
+ throw resp.failure;
+ }
+ }
+ while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) {
+ final Tuple request = readAheadRequest != null ? readAheadRequest : getNextRequest();
+ readAheadRequest = null;
+ if (request == null) {
+ assert currentFile == null && remainingFiles.hasNext() == false;
+ if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
+ onCompleted(null);
+ }
+ return;
+ }
+ final long requestSeqId = requestSeqIdTracker.generateSeqNo();
+ sendChunkRequest(request.v2(), ActionListener.wrap(
+ r -> addItem(requestSeqId, request.v1(), null),
+ e -> addItem(requestSeqId, request.v1(), e)));
+ }
+ // While we are waiting for the responses, we can prepare the next request in advance
+ // so we can send it immediately when the responses arrive to reduce the transfer time.
+ if (readAheadRequest == null) {
+ readAheadRequest = getNextRequest();
+ }
+ } catch (Exception e) {
+ onCompleted(e);
+ }
+ }
+
+ private void onCompleted(Exception failure) {
+ if (Assertions.ENABLED && status != Status.PROCESSING) {
+ throw new AssertionError("invalid status: expected [" + Status.PROCESSING + "] actual [" + status + "]", failure);
+ }
+ status = failure == null ? Status.SUCCESS : Status.FAILED;
+ try {
+ IOUtils.close(failure, this);
+ } catch (Exception e) {
+ listener.onFailure(e);
+ return;
+ }
+ listener.onResponse(null);
+ }
+
+ private Tuple getNextRequest() throws Exception {
+ try {
+ if (currentFile == null) {
+ if (remainingFiles.hasNext()) {
+ currentFile = remainingFiles.next();
+ onNewFile(currentFile);
+ } else {
+ return null;
+ }
+ }
+ final StoreFileMetaData md = currentFile;
+ final Request request = nextChunkRequest(md);
+ if (request.lastChunk()) {
+ currentFile = null;
+ }
+ return Tuple.tuple(md, request);
+ } catch (Exception e) {
+ handleError(currentFile, e);
+ throw e;
+ }
+ }
+
+ /**
+ * This method is called when starting sending/requesting a new file. Subclasses should override
+ * this method to reset the file offset or close the previous file and open a new file if needed.
+ */
+ protected abstract void onNewFile(StoreFileMetaData md) throws IOException;
+
+ protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException;
+
+ protected abstract void sendChunkRequest(Request request, ActionListener listener);
+
+ protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception;
+
+ private static class FileChunkResponseItem {
+ final long requestSeqId;
+ final StoreFileMetaData md;
+ final Exception failure;
+
+ FileChunkResponseItem(long requestSeqId, StoreFileMetaData md, Exception failure) {
+ this.requestSeqId = requestSeqId;
+ this.md = md;
+ this.failure = failure;
+ }
+ }
+
+ protected interface ChunkRequest {
+ /**
+ * @return {@code true} if this chunk request is the last chunk of the current file
+ */
+ boolean lastChunk();
+ }
+
+ private enum Status {
+ PROCESSING,
+ SUCCESS,
+ FAILED
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
index f53e8edecd9e6..686ccc799eef0 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
@@ -176,7 +176,7 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
final RemoteRecoveryTargetHandler recoveryTarget =
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
- handler = new RecoverySourceHandler(shard, recoveryTarget, request,
+ handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
return handler;
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 3ae9598124b05..e1353ecb52fac 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -37,7 +37,7 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
@@ -49,7 +49,6 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
-import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
@@ -61,11 +60,12 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
+import org.elasticsearch.transport.Transports;
import java.io.Closeable;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -73,13 +73,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
-import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
-
/**
* RecoverySourceHandler handles the three phases of shard recovery, which is
* everything relating to copying the segment files as well as sending translog
@@ -102,12 +99,15 @@ public class RecoverySourceHandler {
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentFileChunks;
+ private final ThreadPool threadPool;
private final CancellableThreads cancellableThreads = new CancellableThreads();
+ private final List resources = new CopyOnWriteArrayList<>();
- public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
- final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) {
+ public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool,
+ StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
this.shard = shard;
this.recoveryTarget = recoveryTarget;
+ this.threadPool = threadPool;
this.request = request;
this.shardId = this.request.shardId().id();
this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName());
@@ -123,7 +123,6 @@ public StartRecoveryRequest getRequest() {
* performs the recovery from the local engine to the target
*/
public void recoverToTarget(ActionListener listener) {
- final List resources = new CopyOnWriteArrayList<>();
final Closeable releaseResources = () -> IOUtils.close(resources);
final ActionListener wrappedListener = ActionListener.notifyOnce(listener);
try {
@@ -404,15 +403,17 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
final StepListener sendFileInfoStep = new StepListener<>();
+ final StepListener sendFilesStep = new StepListener<>();
final StepListener cleanFilesStep = new StepListener<>();
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
- sendFileInfoStep.whenComplete(r -> {
- sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
- cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep);
- }, listener::onFailure);
+ sendFileInfoStep.whenComplete(r ->
+ sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
+
+ sendFilesStep.whenComplete(r ->
+ cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure);
final long totalSize = totalSizeInBytes;
final long existingTotalSize = existingTotalSizeInBytes;
@@ -571,6 +572,7 @@ private void sendBatch(
final long mappingVersionOnPrimary,
final ActionListener listener) throws IOException {
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
+ assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send translog]");
final List operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
@@ -669,54 +671,80 @@ public String toString() {
'}';
}
- void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception {
+ private static class FileChunk implements MultiFileTransfer.ChunkRequest {
+ final StoreFileMetaData md;
+ final BytesReference content;
+ final long position;
+ final boolean lastChunk;
+
+ FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
+ this.md = md;
+ this.content = content;
+ this.position = position;
+ this.lastChunk = lastChunk;
+ }
+
+ @Override
+ public boolean lastChunk() {
+ return lastChunk;
+ }
+ }
+
+ void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps, ActionListener listener) {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
- final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
- final AtomicReference> error = new AtomicReference<>();
- final byte[] buffer = new byte[chunkSizeInBytes];
- for (final StoreFileMetaData md : files) {
- if (error.get() != null) {
- break;
- }
- try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
- InputStream in = new InputStreamIndexInput(indexInput, md.length())) {
- long position = 0;
- int bytesRead;
- while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) {
- final BytesArray content = new BytesArray(buffer, 0, bytesRead);
- final boolean lastChunk = position + content.length() == md.length();
- final long requestSeqId = requestSeqIdTracker.generateSeqNo();
- cancellableThreads.execute(
- () -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - maxConcurrentFileChunks));
+
+ final MultiFileTransfer multiFileSender =
+ new MultiFileTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
+
+ final byte[] buffer = new byte[chunkSizeInBytes];
+ InputStreamIndexInput currentInput = null;
+ long offset = 0;
+
+ @Override
+ protected void onNewFile(StoreFileMetaData md) throws IOException {
+ offset = 0;
+ IOUtils.close(currentInput, () -> currentInput = null);
+ final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
+ currentInput = new InputStreamIndexInput(indexInput, md.length()) {
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop
+ }
+ };
+ }
+
+ @Override
+ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {
+ assert Transports.assertNotTransportThread("read file chunk");
cancellableThreads.checkForCancel();
- if (error.get() != null) {
- break;
+ final int bytesRead = currentInput.read(buffer);
+ if (bytesRead == -1) {
+ throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
}
- final long requestFilePosition = position;
- cancellableThreads.executeIO(() ->
- recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(),
- ActionListener.wrap(
- r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId),
- e -> {
- error.compareAndSet(null, Tuple.tuple(md, e));
- requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
- }
- )));
- position += content.length();
+ final boolean lastChunk = offset + bytesRead == md.length();
+ final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk);
+ offset += bytesRead;
+ return chunk;
}
- } catch (Exception e) {
- error.compareAndSet(null, Tuple.tuple(md, e));
- break;
- }
- }
- // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
- // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
- if (error.get() == null) {
- cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
- }
- if (error.get() != null) {
- handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()});
- }
+
+ @Override
+ protected void sendChunkRequest(FileChunk request, ActionListener listener) {
+ cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
+ request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
+ }
+
+ @Override
+ protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
+ handleErrorOnSendFiles(store, e, new StoreFileMetaData[]{md});
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(currentInput, () -> currentInput = null);
+ }
+ };
+ resources.add(multiFileSender);
+ multiFileSender.start();
}
private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps,
@@ -740,6 +768,7 @@ private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntS
private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
+ assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[handle error on send/clean files]");
if (corruptIndexException != null) {
Exception localException = null;
for (StoreFileMetaData md : mds) {
@@ -763,9 +792,8 @@ private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[
shardId, request.targetNode(), mds), corruptIndexException);
throw remoteException;
}
- } else {
- throw e;
}
+ throw e;
}
protected void failEngine(IOException cause) {
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index bb5457c1a3dca..f6f8baedd9d13 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -181,7 +181,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
- ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE));
+ ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
}
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index 215bf475a0c9b..fdef75bb1df8e 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -32,11 +32,11 @@
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.BytesRefIterator;
+import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -77,6 +77,7 @@
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
+import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@@ -93,10 +94,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntSupplier;
import java.util.zip.CRC32;
@@ -105,7 +107,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyObject;
@@ -121,10 +123,19 @@ public class RecoverySourceHandlerTests extends ESTestCase {
private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private ThreadPool threadPool;
+ private Executor recoveryExecutor;
@Before
public void setUpThreadPool() {
- threadPool = new TestThreadPool(getTestName());
+ if (randomBoolean()) {
+ threadPool = new TestThreadPool(getTestName());
+ recoveryExecutor = threadPool.generic();
+ } else {
+ // verify that both sending and receiving files can be completed with a single thread
+ threadPool = new TestThreadPool(getTestName(),
+ new FixedExecutorBuilder(Settings.EMPTY, "recovery_executor", between(1, 16), between(16, 128), "recovery_executor"));
+ recoveryExecutor = threadPool.executor("recovery_executor");
+ }
}
@After
@@ -133,9 +144,7 @@ public void tearDownThreadPool() {
}
public void testSendFiles() throws Throwable {
- Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
- put("indices.recovery.concurrent_small_file_streams", 1).build();
- final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
+ final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
Store store = newStore(createTempDir());
Directory dir = store.directory();
@@ -156,38 +165,22 @@ public void testSendFiles() throws Throwable {
metas.add(md);
}
Store targetStore = newStore(createTempDir());
+ MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
- IndexOutputOutputStream out;
@Override
public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
int totalTranslogOps, ActionListener listener) {
- try {
- if (position == 0) {
- out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
- @Override
- public void close() throws IOException {
- super.close();
- targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
- }
- };
- }
- final BytesRefIterator iterator = content.iterator();
- BytesRef scratch;
- while ((scratch = iterator.next()) != null) {
- out.write(scratch.bytes, scratch.offset, scratch.length);
- }
- if (lastChunk) {
- out.close();
- }
- listener.onResponse(null);
- } catch (Exception e) {
- listener.onFailure(e);
- }
+ ActionListener.completeWith(listener, () -> {
+ multiFileWriter.writeFileChunk(md, position, content, lastChunk);
+ return null;
+ });
}
};
- RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
- Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5));
- handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
+ RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor),
+ threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5));
+ PlainActionFuture sendFilesFuture = new PlainActionFuture<>();
+ handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture);
+ sendFilesFuture.actionGet();
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null);
Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
assertEquals(metas.size(), recoveryDiff.identical.size());
@@ -195,7 +188,7 @@ public void close() throws IOException {
assertEquals(0, recoveryDiff.missing.size());
IndexReader reader = DirectoryReader.open(targetStore.directory());
assertEquals(numDocs, reader.maxDoc());
- IOUtils.close(reader, store, targetStore);
+ IOUtils.close(reader, store, multiFileWriter, targetStore);
}
public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
@@ -241,10 +234,11 @@ public void indexTranslogOperations(List operations, int tot
RetentionLeases retentionLeases, long mappingVersion, ActionListener listener) {
shippedOps.addAll(operations);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
- listener.onResponse(checkpointOnTarget.get()); }
+ listener.onResponse(checkpointOnTarget.get());
+ }
};
- RecoverySourceHandler handler = new RecoverySourceHandler(
- shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
+ RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
+ threadPool, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future);
@@ -283,8 +277,8 @@ public void indexTranslogOperations(List operations, int tot
}
}
};
- RecoverySourceHandler handler = new RecoverySourceHandler(
- shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
+ RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
+ threadPool, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
@@ -343,52 +337,36 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
(p.getFileName().toString().equals("write.lock") ||
p.getFileName().toString().startsWith("extra")) == false));
Store targetStore = newStore(createTempDir(), false);
+ MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
- IndexOutputOutputStream out;
@Override
public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk,
int totalTranslogOps, ActionListener listener) {
- try {
- if (position == 0) {
- out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) {
- @Override
- public void close() throws IOException {
- super.close();
- targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it
- }
- };
- }
- final BytesRefIterator iterator = content.iterator();
- BytesRef scratch;
- while ((scratch = iterator.next()) != null) {
- out.write(scratch.bytes, scratch.offset, scratch.length);
- }
- if (lastChunk) {
- out.close();
- }
- listener.onResponse(null);
- } catch (Exception e) {
- IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e));
- }
+ ActionListener.completeWith(listener, () -> {
+ multiFileWriter.writeFileChunk(md, position, content, lastChunk);
+ return null;
+ });
}
};
- RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
- Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) {
+ RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
+ request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
-
- try {
- handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
- fail("corrupted index");
- } catch (IOException ex) {
- assertNotNull(ExceptionsHelper.unwrapCorruption(ex));
- }
+ SetOnce sendFilesError = new SetOnce<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0,
+ new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), latch));
+ latch.await();
+ assertThat(sendFilesError.get(), instanceOf(IOException.class));
+ assertNotNull(ExceptionsHelper.unwrapCorruption(sendFilesError.get()));
assertTrue(failedEngine.get());
- IOUtils.close(store, targetStore);
+ // ensure all chunk requests have been completed; otherwise some files on the target are left open.
+ IOUtils.close(() -> terminate(threadPool), () -> threadPool = null);
+ IOUtils.close(store, multiFileWriter, targetStore);
}
@@ -427,28 +405,24 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
}
}
};
- RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request,
- Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) {
+ RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool,
+ request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
- try {
- handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0);
- fail("exception index");
- } catch (RuntimeException ex) {
- final IOException unwrappedCorruption = ExceptionsHelper.unwrapCorruption(ex);
- if (throwCorruptedIndexException) {
- assertNotNull(unwrappedCorruption);
- assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]");
- } else {
- assertNull(unwrappedCorruption);
- assertEquals(ex.getMessage(), "boom");
- }
- } catch (CorruptIndexException ex) {
- fail("not expected here");
+ PlainActionFuture sendFilesFuture = new PlainActionFuture<>();
+ handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture);
+ Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet);
+ final IOException unwrappedCorruption = ExceptionsHelper.unwrapCorruption(ex);
+ if (throwCorruptedIndexException) {
+ assertNotNull(unwrappedCorruption);
+ assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]");
+ } else {
+ assertNull(unwrappedCorruption);
+ assertEquals(ex.getMessage(), "boom");
}
assertFalse(failedEngine.get());
IOUtils.close(store);
@@ -472,6 +446,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
final RecoverySourceHandler handler = new RecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
+ threadPool,
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8)) {
@@ -550,19 +525,13 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
};
final int maxConcurrentChunks = between(1, 8);
final int chunkSize = between(1, 32);
- final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(),
+ final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(),
chunkSize, maxConcurrentChunks);
Store store = newStore(createTempDir(), false);
List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
- Thread sender = new Thread(() -> {
- try {
- handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0);
- } catch (Exception ex) {
- throw new AssertionError(ex);
- }
- });
- sender.start();
+ PlainActionFuture sendFilesFuture = new PlainActionFuture<>();
+ handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture);
assertBusy(() -> {
assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)));
assertThat(unrepliedChunks, hasSize(sentChunks.get()));
@@ -594,13 +563,11 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks));
});
}
- sender.join();
+ sendFilesFuture.actionGet();
store.close();
}
public void testSendFileChunksStopOnError() throws Exception {
- final IndexShard shard = mock(IndexShard.class);
- when(shard.state()).thenReturn(IndexShardState.STARTED);
final List unrepliedChunks = new CopyOnWriteArrayList<>();
final AtomicInteger sentChunks = new AtomicInteger();
final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@@ -616,23 +583,23 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
};
final int maxConcurrentChunks = between(1, 4);
final int chunkSize = between(1, 16);
- final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(),
- chunkSize, maxConcurrentChunks);
+ final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor),
+ threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks);
Store store = newStore(createTempDir(), false);
List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20));
int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum();
- AtomicReference error = new AtomicReference<>();
- Thread sender = new Thread(() -> {
- try {
- handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0);
- } catch (Exception ex) {
- error.set(ex);
- }
- });
- sender.start();
+ SetOnce sendFilesError = new SetOnce<>();
+ CountDownLatch sendFilesLatch = new CountDownLatch(1);
+ handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0,
+ new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), sendFilesLatch));
assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))));
List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks);
- failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception")));
+ CountDownLatch replyLatch = new CountDownLatch(failedChunks.size());
+ failedChunks.forEach(c -> {
+ c.listener.onFailure(new IllegalStateException("test chunk exception"));
+ replyLatch.countDown();
+ });
+ replyLatch.await();
unrepliedChunks.removeAll(failedChunks);
unrepliedChunks.forEach(c -> {
if (randomBoolean()) {
@@ -641,12 +608,10 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
c.listener.onResponse(null);
}
});
- assertBusy(() -> {
- assertThat(error.get(), notNullValue());
- assertThat(error.get().getMessage(), containsString("test chunk exception"));
- });
+ sendFilesLatch.await();
+ assertThat(sendFilesError.get(), instanceOf(IllegalStateException.class));
+ assertThat(sendFilesError.get().getMessage(), containsString("test chunk exception"));
assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)));
- sender.join();
store.close();
}
@@ -654,7 +619,7 @@ public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
RecoverySourceHandler handler = new RecoverySourceHandler(
- shard, new TestRecoveryTargetHandler(), getStartRecoveryRequest(), between(1, 16), between(1, 4));
+ shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4));
String syncId = UUIDs.randomBase64UUID();
int numDocs = between(0, 1000);
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 47a8f73ef62c9..e36f5e3999076 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -635,7 +635,7 @@ protected final void recoverUnstartedReplica(final IndexShard replica,
final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
- new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
+ new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool,
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable);
diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
index afd2aa4e85888..1f4198d00a433 100644
--- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
+++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
@@ -83,7 +83,6 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener listener) {
- // TODO: remove this clone once we send file chunk async
final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef()));
executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener));
}