Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
Expand Down Expand Up @@ -170,6 +172,35 @@ public void verifyRepositoryAsync(VerifyRepositoryRequest verifyRepositoryReques
VerifyRepositoryResponse::fromXContent, listener, emptySet());
}

/**
* Cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public CleanupRepositoryResponse cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, emptySet());
}

/**
* Asynchronously cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void cleanupRepositoryAsync(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options,
ActionListener<CleanupRepositoryResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, listener, emptySet());
}

/**
* Creates a snapshot.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
Expand Down Expand Up @@ -94,6 +95,20 @@ static Request verifyRepository(VerifyRepositoryRequest verifyRepositoryRequest)
return request;
}

static Request cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_snapshot")
.addPathPart(cleanupRepositoryRequest.name())
.addPathPartAsIs("_cleanup")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withMasterTimeout(cleanupRepositoryRequest.masterNodeTimeout());
parameters.withTimeout(cleanupRepositoryRequest.timeout());
request.addParameters(parameters.asMap());
return request;
}

static Request createSnapshot(CreateSnapshotRequest createSnapshotRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPart("_snapshot")
.addPathPart(createSnapshotRequest.repository())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
Expand Down Expand Up @@ -135,6 +137,17 @@ public void testVerifyRepository() throws IOException {
assertThat(response.getNodes().size(), equalTo(1));
}

public void testCleanupRepository() throws IOException {
AcknowledgedResponse putRepositoryResponse = createTestRepository("test", FsRepository.TYPE, "{\"location\": \".\"}");
assertTrue(putRepositoryResponse.isAcknowledged());

CleanupRepositoryRequest request = new CleanupRepositoryRequest("test");
CleanupRepositoryResponse response = execute(request, highLevelClient().snapshot()::cleanupRepository,
highLevelClient().snapshot()::cleanupRepositoryAsync);
assertThat(response.result().bytes(), equalTo(0L));
assertThat(response.result().blobs(), equalTo(0L));
}

public void testCreateSnapshot() throws IOException {
String repository = "test_repository";
assertTrue(createTestRepository(repository, FsRepository.TYPE, "{\"location\": \".\"}").isAcknowledged());
Expand Down
36 changes: 36 additions & 0 deletions docs/reference/modules/snapshots.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,42 @@ POST /_snapshot/my_unverified_backup/_verify

It returns a list of nodes where repository was successfully verified or an error message if verification process failed.

[float]
===== Repository Cleanup
Repositories can over time accumulate data that is not referenced by any existing snapshot. This is a result of the data safety guarantees
the snapshot functionality provides in failure scenarios during snapshot creation and the decentralized nature of the snapshot creation
process. This unreferenced data does in no way negatively impact the performance or safety of a snapshot repository but leads to higher
than necessary storage use. In order to clean up this unreferenced data, users can call the cleanup endpoint for a repository which will
trigger a complete accounting of the repositories contents and subsequent deletion of all unreferenced data that was found.

[source,js]
-----------------------------------
POST /_snapshot/my_repository/_cleanup
-----------------------------------
// CONSOLE
// TEST[continued]

The response to a cleanup request looks as follows:

[source,js]
--------------------------------------------------
{
"results": {
"deleted_bytes": 20,
"deleted_blobs": 5
}
}
--------------------------------------------------
// TESTRESPONSE

Depending on the concrete repository implementation the numbers shown for bytes free as well as the number of blobs removed will either
be an approximation or an exact result. Any non-zero value for the number of blobs removed implies that unreferenced blobs were found and
subsequently cleaned up.

Please note that most of the cleanup operations executed by this endpoint are automatically executed when deleting any snapshot from a
repository. If you regularly delete snapshots, you will in most cases not get any or only minor space savings from using this functionality
and should lower your frequency of invoking it accordingly.

[float]
[[snapshots-take-snapshot]]
=== Snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() {
public DeleteResult delete() {
throw new UnsupportedOperationException("URL repository is read only");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -126,9 +127,9 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
try {
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;

import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -92,8 +92,9 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException
service.deleteBlob(clientName, container, blob);
}

public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
service.deleteBlobDirectory(clientName, container, path, executor);
public DeleteResult deleteBlobDirectory(String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
return service.deleteBlobDirectory(clientName, container, path, executor);
}

public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -73,7 +74,7 @@
import static java.util.Collections.emptyMap;

public class AzureStorageService {

private static final Logger logger = LogManager.getLogger(AzureStorageService.class);

public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
Expand Down Expand Up @@ -193,13 +194,15 @@ public void deleteBlob(String account, String container, String blob) throws URI
});
}

void deleteBlobDirectory(String account, String container, String path, Executor executor)
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
Expand All @@ -209,7 +212,17 @@ void deleteBlobDirectory(String account, String container, String path, Executor
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
deleteBlob(account, container, blobPath);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}

@Override
Expand All @@ -235,6 +248,7 @@ public void onAfter() {
exceptions.forEach(ex::addSuppressed);
throw ex;
}
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}

public InputStream getInputStream(String account, String container, String blob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;

import java.io.IOException;
Expand Down Expand Up @@ -77,8 +78,8 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() throws IOException {
blobStore.deleteDirectory(path().buildAsString());
public DeleteResult delete() throws IOException {
return blobStore.deleteDirectory(path().buildAsString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.internal.io.Streams;
Expand All @@ -55,6 +56,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -300,15 +302,24 @@ void deleteBlob(String blobName) throws IOException {
*
* @param pathStr Name of path to delete
*/
void deleteDirectory(String pathStr) throws IOException {
SocketAccess.doPrivilegedVoidIOException(() -> {
DeleteResult deleteDirectory(String pathStr) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
do {
final Collection<String> blobsToDelete = new ArrayList<>();
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
final AtomicLong blobsDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
page.getValues().forEach(b -> {
blobsToDelete.add(b.getName());
blobsDeleted.incrementAndGet();
bytesDeleted.addAndGet(b.getSize());
});
deleteBlobsIgnoringIfNotExists(blobsToDelete);
deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get());
page = page.getNextPage();
} while (page != null);
return deleteResult;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
Expand Down Expand Up @@ -69,9 +70,13 @@ public void deleteBlob(String blobName) throws IOException {
}
}

// TODO: See if we can get precise result reporting.
private static final DeleteResult DELETE_RESULT = new DeleteResult(1L, 0L);

@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));
return DELETE_RESULT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.MockSecureSettings;
Expand All @@ -30,6 +31,7 @@
import java.util.Collection;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
Expand Down Expand Up @@ -58,4 +60,14 @@ protected void createRepository(String repoName) {
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}

// HDFS repository doesn't have precise cleanup stats so we only check whether or not any blobs were removed
@Override
protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) {
if (blobs > 0) {
assertThat(response.result().blobs(), greaterThan(0L));
} else {
assertThat(response.result().blobs(), equalTo(0L));
}
}
}
Loading