Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;

import java.util.ArrayList;
Expand Down Expand Up @@ -226,6 +227,37 @@ public void onFailure(Exception e) {
};
}

/**
* Wraps a given listener and returns a new listener which executes the provided {@code runBefore}
* callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
* If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will
* not be executed.
*/
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
runBefore.run();
} catch (Exception ex) {
delegate.onFailure(ex);
return;
}
delegate.onResponse(response);
}

@Override
public void onFailure(Exception e) {
try {
runBefore.run();
} catch (Exception ex) {
e.addSuppressed(ex);
}
delegate.onFailure(e);
}
};
}

/**
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,11 @@ public boolean isReadOnly() {
return in.isReadOnly();
}


@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
}

@Override
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -191,27 +191,6 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
*/
boolean isReadOnly();

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param indexShard the shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
*/
@Deprecated
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
Expand All @@ -226,9 +205,10 @@ default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);

/**
* Restores snapshot of the shard.
Expand Down
Loading