From 7aa23088bbdf5e3a021e7dbcd5f6db95f3cce5d6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 8 Dec 2017 16:11:16 +0100 Subject: [PATCH 1/2] Use AmazonS3.doesObjectExist() method --- .../blobstore/BlobStoreRepository.java | 42 +++++--------- .../snapshots/SnapshotShardsService.java | 1 - .../snapshots/SnapshotsService.java | 56 +++++++++---------- .../repositories/s3/S3BlobContainer.java | 13 ++--- 4 files changed, 47 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a66a5a51d1023..2230c106d7cfb 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -423,36 +423,26 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { } private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { - if (snapshotInfo != null) { - // we know the version the snapshot was created with - try { - snapshotFormat.delete(snapshotsBlobContainer, blobId); - } catch (IOException e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", snapshotInfo.snapshotId(), blobId), e); - } - } else { - try { - snapshotFormat.delete(snapshotsBlobContainer, blobId); - } catch (IOException e) { - // snapshot file could not be deleted, log the error + try { + snapshotFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + if (snapshotInfo != null) { + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]", + snapshotInfo.snapshotId(), blobId), e); + } else { logger.warn((Supplier) () -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e); } } } private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { - if (snapshotInfo != null) { - // we know the version the snapshot was created with - try { - globalMetaDataFormat.delete(snapshotsBlobContainer, blobId); - } catch (IOException e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", snapshotInfo.snapshotId(), blobId), e); - } - } else { - try { - globalMetaDataFormat.delete(snapshotsBlobContainer, blobId); - } catch (IOException e) { - // global metadata file could not be deleted, log the error + try { + globalMetaDataFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + if (snapshotInfo != null) { + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]", + snapshotInfo.snapshotId(), blobId), e); + } else { logger.warn((Supplier) () -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e); } } @@ -512,9 +502,7 @@ private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVer // When we delete corrupted snapshots we might not know which version we are dealing with // We can try detecting the version based on the metadata file format assert ignoreIndexErrors; - if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) { - snapshotVersion = Version.CURRENT; - } else { + if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) == false) { throw new SnapshotMissingException(metadata.name(), snapshotId); } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 15f70e8b2c6fc..d3e0aac5e2a90 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -581,7 +581,6 @@ public ClusterTasksResult execute(Cluster entries.add(updatedEntry); // Finalize snapshot in the repository snapshotsService.endSnapshot(updatedEntry); - logger.info("snapshot [{}] is done", updatedEntry.snapshot()); } } else { entries.add(entry); diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5092a58adaa3d..7a5fdaa705270 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -263,7 +263,7 @@ public ClusterState execute(ClusterState currentState) { null); snapshots = new SnapshotsInProgress(newSnapshot); } else { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "a snapshot is already running"); + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); } return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } @@ -363,6 +363,8 @@ private void beginSnapshot(final ClusterState clusterState, repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); snapshotCreated = true; + + logger.info("snapshot [{}] started", snapshot.snapshot()); if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(); @@ -947,35 +949,33 @@ void endSnapshot(SnapshotsInProgress.Entry entry) { * @param failure failure reason or null if snapshot was successful */ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() { - @Override - public void run() { - final Snapshot snapshot = entry.snapshot(); - try { - final Repository repository = repositoriesService.repository(snapshot.getRepository()); - logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); - ArrayList shardFailures = new ArrayList<>(); - for (ObjectObjectCursor shardStatus : entry.shards()) { - ShardId shardId = shardStatus.key; - ShardSnapshotStatus status = shardStatus.value; - if (status.state().failed()) { - shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); - } + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + final Snapshot snapshot = entry.snapshot(); + try { + final Repository repository = repositoriesService.repository(snapshot.getRepository()); + logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); + ArrayList shardFailures = new ArrayList<>(); + for (ObjectObjectCursor shardStatus : entry.shards()) { + ShardId shardId = shardStatus.key; + ShardSnapshotStatus status = shardStatus.value; + if (status.state().failed()) { + shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } - SnapshotInfo snapshotInfo = repository.finalizeSnapshot( - snapshot.getSnapshotId(), - entry.indices(), - entry.startTime(), - failure, - entry.shards().size(), - Collections.unmodifiableList(shardFailures), - entry.getRepositoryStateId(), - entry.includeGlobalState()); - removeSnapshotFromClusterState(snapshot, snapshotInfo, null); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e); - removeSnapshotFromClusterState(snapshot, null, e); } + SnapshotInfo snapshotInfo = repository.finalizeSnapshot( + snapshot.getSnapshotId(), + entry.indices(), + entry.startTime(), + failure, + entry.shards().size(), + Collections.unmodifiableList(shardFailures), + entry.getRepositoryStateId(), + entry.includeGlobalState()); + removeSnapshotFromClusterState(snapshot, snapshotInfo, null); + logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e); + removeSnapshotFromClusterState(snapshot, null, e); } }); } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index bb1130db42d9a..401ef0933a847 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -73,14 +73,9 @@ class S3BlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(String blobName) { try { - return SocketAccess.doPrivileged(() -> { - blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName)); - return true; - }); - } catch (AmazonS3Exception e) { - return false; + return SocketAccess.doPrivileged(() -> blobStore.client().doesObjectExist(blobStore.bucket(), buildKey(blobName))); } catch (Exception e) { - throw new BlobStoreException("failed to check if blob exists", e); + throw new BlobStoreException("Failed to check if blob [" + blobName +"] exists", e); } } @@ -102,7 +97,7 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); + throw new FileAlreadyExistsException("Blob [" + blobName + "] already exists, cannot overwrite"); } SocketAccess.doPrivilegedIOException(() -> { @@ -117,7 +112,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t @Override public void deleteBlob(String blobName) throws IOException { - if (!blobExists(blobName)) { + if (blobExists(blobName) == false) { throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } From 00e76a1fe7d34a7ec1b11dcea67b5be1f72b7e95 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 11 Dec 2017 10:06:02 +0100 Subject: [PATCH 2/2] Apply feedback and fix test --- .../repositories/blobstore/BlobStoreRepository.java | 8 ++++---- .../org/elasticsearch/repositories/s3/MockAmazonS3.java | 9 +++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2230c106d7cfb..9afbb52878207 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -368,9 +368,9 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { writeIndexGen(updatedRepositoryData, repositoryStateId); // delete the snapshot file - safeSnapshotBlobDelete(snapshot, snapshotId.getUUID()); + deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID()); // delete the global metadata file - safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getUUID()); + deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID()); // Now delete all indices for (String index : indices) { @@ -422,7 +422,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { } } - private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { + private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { try { snapshotFormat.delete(snapshotsBlobContainer, blobId); } catch (IOException e) { @@ -435,7 +435,7 @@ private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final Strin } } - private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { + private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { try { globalMetaDataFormat.delete(snapshotsBlobContainer, blobId); } catch (IOException e) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java index af7638b4111d3..a090fdd5281fd 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java @@ -21,6 +21,7 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.AbstractAmazonS3; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CopyObjectRequest; @@ -36,14 +37,12 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.amazonaws.util.Base64; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.Socket; -import java.security.DigestInputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -88,6 +87,12 @@ public boolean doesBucketExist(String bucket) { return true; } + @Override + public boolean doesObjectExist(String bucketName, String objectName) throws AmazonServiceException, SdkClientException { + simulateS3SocketConnection(); + return blobs.containsKey(objectName); + } + @Override public ObjectMetadata getObjectMetadata( GetObjectMetadataRequest getObjectMetadataRequest)