Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,9 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
writeIndexGen(updatedRepositoryData, repositoryStateId);

// delete the snapshot file
safeSnapshotBlobDelete(snapshot, snapshotId.getUUID());
deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID());
// delete the global metadata file
safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getUUID());
deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID());

// Now delete all indices
for (String index : indices) {
Expand Down Expand Up @@ -422,37 +422,27 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}
}

private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", snapshotInfo.snapshotId(), blobId), e);
}
} else {
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// snapshot file could not be deleted, log the error
private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
if (snapshotInfo != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]",
snapshotInfo.snapshotId(), blobId), e);
} else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e);
}
}
}

private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", snapshotInfo.snapshotId(), blobId), e);
}
} else {
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// global metadata file could not be deleted, log the error
private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
if (snapshotInfo != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]",
snapshotInfo.snapshotId(), blobId), e);
} else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e);
}
}
Expand Down Expand Up @@ -512,9 +502,7 @@ private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVer
// When we delete corrupted snapshots we might not know which version we are dealing with
// We can try detecting the version based on the metadata file format
assert ignoreIndexErrors;
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) {
snapshotVersion = Version.CURRENT;
} else {
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) == false) {
throw new SnapshotMissingException(metadata.name(), snapshotId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,6 @@ public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest> execute(Cluster
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
logger.info("snapshot [{}] is done", updatedEntry.snapshot());
}
} else {
entries.add(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public ClusterState execute(ClusterState currentState) {
null);
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "a snapshot is already running");
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
Expand Down Expand Up @@ -363,6 +363,8 @@ private void beginSnapshot(final ClusterState clusterState,

repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true;

logger.info("snapshot [{}] started", snapshot.snapshot());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse();
Expand Down Expand Up @@ -947,35 +949,33 @@ void endSnapshot(SnapshotsInProgress.Entry entry) {
* @param failure failure reason or null if snapshot was successful
*/
private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
@Override
public void run() {
final Snapshot snapshot = entry.snapshot();
try {
final Repository repository = repositoriesService.repository(snapshot.getRepository());
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
ShardId shardId = shardStatus.key;
ShardSnapshotStatus status = shardStatus.value;
if (status.state().failed()) {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
final Snapshot snapshot = entry.snapshot();
try {
final Repository repository = repositoriesService.repository(snapshot.getRepository());
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
ShardId shardId = shardStatus.key;
ShardSnapshotStatus status = shardStatus.value;
if (status.state().failed()) {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
snapshot.getSnapshotId(),
entry.indices(),
entry.startTime(),
failure,
entry.shards().size(),
Collections.unmodifiableList(shardFailures),
entry.getRepositoryStateId(),
entry.includeGlobalState());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
}
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
snapshot.getSnapshotId(),
entry.indices(),
entry.startTime(),
failure,
entry.shards().size(),
Collections.unmodifiableList(shardFailures),
entry.getRepositoryStateId(),
entry.includeGlobalState());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,9 @@ class S3BlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return SocketAccess.doPrivileged(() -> {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
});
} catch (AmazonS3Exception e) {
return false;
return SocketAccess.doPrivileged(() -> blobStore.client().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
} catch (Exception e) {
throw new BlobStoreException("failed to check if blob exists", e);
throw new BlobStoreException("Failed to check if blob [" + blobName +"] exists", e);
}
}

Expand All @@ -102,7 +97,7 @@ public InputStream readBlob(String blobName) throws IOException {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
throw new FileAlreadyExistsException("Blob [" + blobName + "] already exists, cannot overwrite");
}

SocketAccess.doPrivilegedIOException(() -> {
Expand All @@ -117,7 +112,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t

@Override
public void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
if (blobExists(blobName) == false) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.AbstractAmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
Expand All @@ -36,14 +37,12 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.util.Base64;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.Socket;
import java.security.DigestInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -88,6 +87,12 @@ public boolean doesBucketExist(String bucket) {
return true;
}

@Override
public boolean doesObjectExist(String bucketName, String objectName) throws AmazonServiceException, SdkClientException {
simulateS3SocketConnection();
return blobs.containsKey(objectName);
}

@Override
public ObjectMetadata getObjectMetadata(
GetObjectMetadataRequest getObjectMetadataRequest)
Expand Down