Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public SnapshotException(final String repositoryName, final String snapshotName,
}

public SnapshotException(final String repositoryName, final String snapshotName, final String msg, final Throwable cause) {
super("[" + repositoryName + ":" + snapshotName + "]" + msg, cause);
super("[" + repositoryName + ":" + snapshotName + "] " + msg, cause);
this.repositoryName = repositoryName;
this.snapshotName = snapshotName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
public class SnapshotMissingException extends SnapshotException {

public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId, final Throwable cause) {
super(repositoryName, snapshotId, " is missing", cause);
super(repositoryName, snapshotId, "is missing", cause);
}

public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId) {
super(repositoryName, snapshotId, " is missing");
super(repositoryName, snapshotId, "is missing");
}

public SnapshotMissingException(final String repositoryName, final String snapshotName) {
super(repositoryName, snapshotName, " is missing");
super(repositoryName, snapshotName, "is missing");
}

public SnapshotMissingException(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand All @@ -85,6 +85,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
import static org.elasticsearch.transport.EmptyTransportResponseHandler.INSTANCE_SAME;

/**
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
Expand Down Expand Up @@ -151,7 +152,6 @@ protected void doStop() {
} finally {
shutdownLock.unlock();
}

}

@Override
Expand All @@ -162,14 +162,16 @@ protected void doClose() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);

if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) {
SnapshotsInProgress previousSnapshots = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE);
if ((previousSnapshots == null && currentSnapshots != null)
|| (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) {
processIndexShardSnapshots(event);
}
String masterNodeId = event.state().nodes().getMasterNodeId();
if (masterNodeId != null && masterNodeId.equals(event.previousState().nodes().getMasterNodeId()) == false) {

String previousMasterNodeId = event.previousState().nodes().getMasterNodeId();
String currentMasterNodeId = event.state().nodes().getMasterNodeId();
if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) {
syncShardStatsOnNewMaster(event);
}

Expand Down Expand Up @@ -240,7 +242,6 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = new HashMap<>();
// Now go through all snapshots and update existing or create missing
final String localNodeId = event.state().nodes().getLocalNodeId();
final DiscoveryNode masterNode = event.state().nodes().getMasterNode();
final Map<Snapshot, Map<String, IndexId>> snapshotIndices = new HashMap<>();
if (snapshotsInProgress != null) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
Expand Down Expand Up @@ -286,17 +287,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
snapshotStatus.abort();
break;
case FINALIZE:
logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish", entry.snapshot(), shard.key);
logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
"letting it finish", entry.snapshot(), shard.key);
break;
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode);
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " +
"updating status on the master", entry.snapshot(), shard.key);
notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId);
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure()), masterNode);
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
"updating status on the master", entry.snapshot(), shard.key);
notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotStatus.failure());
break;
default:
throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage());
Expand Down Expand Up @@ -325,34 +327,46 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
if (newSnapshots.isEmpty() == false) {
Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) {
Map<String, IndexId> indicesMap = snapshotIndices.get(entry.getKey());
final Snapshot snapshot = entry.getKey();
final Map<String, IndexId> indicesMap = snapshotIndices.get(snapshot);
assert indicesMap != null;

for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
final ShardId shardId = shardEntry.getKey();
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {
@Override
public void doRun() {
snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
updateIndexShardSnapshotStatus(entry.getKey(), shardId,
new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode);
}
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e);
updateIndexShardSnapshotStatus(entry.getKey(), shardId,
new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode);
}
final SetOnce<Exception> failure = new SetOnce<>();

});
} catch (Exception e) {
updateIndexShardSnapshotStatus(entry.getKey(), shardId,
new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode);
}
@Override
public void doRun() {
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
}

@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
failure.set(e);
}

@Override
public void onRejection(Exception e) {
failure.set(e);
}

@Override
public void onAfter() {
final Exception exception = failure.get();
if (exception != null) {
notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception));
} else {
notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId);
}
}
});
}
}
}
Expand All @@ -365,34 +379,36 @@ public void onFailure(Exception e) {
* @param snapshotStatus snapshot status
*/
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) {
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
ShardId shardId = indexShard.shardId();
if (!indexShard.routingEntry().primary()) {
final ShardId shardId = indexShard.shardId();
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
if (indexShard.routingEntry().relocating()) {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
}
if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) {

final IndexShardState indexShardState = indexShard.state();
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
// shard has just been created, or still recovering
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}

final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
StringBuilder details = new StringBuilder();
details.append(" index : version [").append(snapshotStatus.indexVersion());
details.append("], number_of_files [").append(snapshotStatus.numberOfFiles());
details.append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository,
TimeValue.timeValueMillis(snapshotStatus.time()), sb);
TimeValue.timeValueMillis(snapshotStatus.time()), details);
}
}
} catch (SnapshotFailedEngineException e) {
throw e;
} catch (IndexShardSnapshotFailedException e) {
} catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) {
throw e;
} catch (Exception e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e);
Expand All @@ -407,8 +423,8 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
if (snapshotsInProgress == null) {
return;
}

final String localNodeId = event.state().nodes().getLocalNodeId();
final DiscoveryNode masterNode = event.state().nodes().getMasterNode();
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
Map<ShardId, IndexShardSnapshotStatus> localShards = currentSnapshotShards(snapshot.snapshot());
Expand All @@ -422,15 +438,15 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
// Master knows about the shard and thinks it has not completed
if (localShardStatus.stage() == Stage.DONE) {
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode);
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
"updating status on the master", snapshot.snapshot(), shardId);
notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId);

} else if (localShard.getValue().stage() == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure()), masterNode);

logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " +
"updating status on master", snapshot.snapshot(), shardId);
notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, localShardStatus.failure());
}
}
}
Expand All @@ -450,7 +466,6 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
}
}


/**
* Internal request that is used to send changes in snapshot status to master
*/
Expand Down Expand Up @@ -510,13 +525,21 @@ public String toString() {
}
}

/**
* Updates the shard status
*/
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
/** Notify the master node that the given shard has been successfully snapshotted **/
void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String localNodeId) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS));
}

/** Notify the master node that the given shard failed to be snapshotted **/
void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String localNodeId, final String failure) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure));
}

/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
try {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, INSTANCE_SAME);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
}
Expand Down
Loading