Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -301,11 +303,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener) {
// TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async
ActionListener.completeWith(listener, () -> {
final ShardId shardId = store.shardId();
final LinkedList<Closeable> toClose = new LinkedList<>();
final ActionListener<Void> restoreListener = ActionListener.runBefore(ActionListener.delegateResponse(listener,
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))),
() -> IOUtils.close(toClose));
try {
// TODO: Add timeouts to network calls / the restore process.
createEmptyStore(store);
ShardId shardId = store.shardId();

final Map<String, String> ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
Expand Down Expand Up @@ -348,21 +353,23 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
},
CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()),
Ccr.CCR_THREAD_POOL_NAME);

toClose.add(() -> {
logger.trace(
"{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId);
renewable.cancel();
});
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) {
restoreSession.restoreFiles(store);
final RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState);
toClose.addFirst(restoreSession); // Some tests depend on closing session before cancelling retention lease renewal
restoreSession.restoreFiles(store, ActionListener.wrap(v -> {
logger.trace("[{}] completed CCR restore", shardId);
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex());
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
} finally {
logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId,
retentionLeaseId);
renewable.cancel();
}
return null;
});
restoreListener.onResponse(null);
}, restoreListener::onFailure));
} catch (Exception e) {
restoreListener.onFailure(e);
}
}

private void createEmptyStore(Store store) {
Expand Down Expand Up @@ -471,98 +478,90 @@ private static class RestoreSession extends FileRestoreContext implements Closea
this.throttleListener = throttleListener;
}

void restoreFiles(Store store) {
void restoreFiles(Store store, ActionListener<Void> listener) {
ArrayList<FileInfo> fileInfos = new ArrayList<>();
for (StoreFileMetaData fileMetaData : sourceMetaData) {
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
}
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
restore(snapshotFiles, store, future);
future.actionGet();
restore(snapshotFiles, store, listener);
}

@Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final PlainActionFuture<Void> restoreFilesFuture = new PlainActionFuture<>();
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) {

final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
long offset = 0;

@Override
protected void onNewFile(StoreFileMetaData md) {
offset = 0;
}

@Override
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> allFilesListener) {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) {

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(
r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}

private void writeFileChunk(StoreFileMetaData md,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
multiFileWriter.incRef();
try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
} catch (Exception e) {
handleError(md, e);
throw e;
}
}

@Override
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
});
long offset = 0;

@Override
protected void onNewFile(StoreFileMetaData md) {
offset = 0;
}

@Override
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}

private void writeFileChunk(StoreFileMetaData md,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
multiFileWriter.incRef();
try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
} catch (Exception e) {
handleError(md, e);
throw e;
}

@Override
public void close() {
multiFileWriter.close();
}

@Override
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
};
multiFileTransfer.start();
restoreFilesFuture.actionGet();
logger.trace("[{}] completed CCR restore", shardId);
return null;
});
throw e;
}

@Override
public void close() {
multiFileWriter.close();
}
};
multiFileTransfer.start();
}

@Override
Expand Down