Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public void deleteBlob(String blobName) throws IOException {
throw new UnsupportedOperationException("URL repository is read only");
}

@Override
public void delete() {
throw new UnsupportedOperationException("URL repository is read only");
}

/**
* This operation is not supported by URLBlobContainer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
Expand All @@ -38,6 +39,7 @@
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -127,31 +129,67 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
public void delete() throws IOException {
PlainActionFuture<Void> result = PlainActionFuture.newFuture();
asyncDelete(result);
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during container delete", e);
}
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}

private void asyncDelete(ActionListener<Void> listener) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why recurse by level here and not list all sublevels at once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming you're talking about flat listing all the blobs and then just deleting one by one like I did for GCS and S3?:
Mainly just to keep the changes size down a little for now as that didn't get us much here: The gain of being more effective when listing here is kinda small on Azure since we delete one by one (so the time that goes into listing requests is a lot less relevant overall than it would be if we deleted 100 or 1k blobs at a time). I can do a bit of a bigger refactoring though and move the parallelization logic into the blob store. Let me look into that :)

final Collection<BlobContainer> childContainers = children().values();
if (childContainers.isEmpty() == false) {
final ActionListener<Void> childListener = new GroupedActionListener<>(
ActionListener.wrap(v -> asyncDeleteBlobsIgnoringIfNotExists(
new ArrayList<>(listBlobs().keySet()), listener), listener::onFailure), childContainers.size());
for (BlobContainer container : childContainers) {
threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).submit(new ActionRunnable<>(childListener) {
@Override
protected void doRun() throws Exception {
((AzureBlobContainer) container).asyncDelete(childListener);
}
});
}
} else {
asyncDeleteBlobsIgnoringIfNotExists(new ArrayList<>(listBlobs().keySet()), listener);
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
asyncDeleteBlobsIgnoringIfNotExists(blobNames, result);
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}

private void asyncDeleteBlobsIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> callback) {
if (blobNames.isEmpty()) {
callback.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(callback, v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
}
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
logger.trace("listBlobsByPrefix({})", prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public void deleteBlob(String blobName) throws IOException {
blobStore.deleteBlob(buildKey(blobName));
}

@Override
public void delete() throws IOException {
blobStore.deleteDirectory(path().buildAsString());
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.gcs;

import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
Expand Down Expand Up @@ -306,6 +307,23 @@ void deleteBlob(String blobName) throws IOException {
}
}

/**
* Deletes the given path and all its children.
*
* @param pathStr Name of path to delete
*/
void deleteDirectory(String pathStr) throws IOException {
SocketAccess.doPrivilegedVoidIOException(() -> {
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
do {
final Collection<String> blobsToDelete = new ArrayList<>();
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
deleteBlobsIgnoringIfNotExists(blobsToDelete);
page = page.getNextPage();
} while (page != null);
});
}

/**
* Deletes multiple blobs from the specific bucket using a batch request
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void deleteBlob(String blobName) throws IOException {
}
}

@Override
public void delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));
}

@Override
public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream does buffering internally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -129,12 +130,53 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void delete() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
final ObjectListing finalPrevListing = prevListing;
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final List<String> blobsToDelete =
list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
if (list.isTruncated()) {
doDeleteBlobs(blobsToDelete, false);
prevListing = list;
} else {
final List<String> lastBlobsToDelete = new ArrayList<>(blobsToDelete);
lastBlobsToDelete.add(keyPath);
doDeleteBlobs(lastBlobsToDelete, false);
break;
}
}
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
doDeleteBlobs(blobNames, true);
}

private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
final Set<String> outstanding;
if (relative) {
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
} else {
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,11 @@ protected void assertChildren(BlobPath path, Collection<String> children) throws
// to become consistent.
assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES);
}

@Override
protected void assertDeleted(BlobPath path, String name) throws Exception {
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
// to become consistent.
assertBusy(() -> super.assertDeleted(path, name), 10L, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes this container and all its contents from the repository.
* @throws IOException on failure
*/
void delete() throws IOException;

/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.common.blobstore;

import org.elasticsearch.common.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -68,6 +70,20 @@ public String buildAsString() {
return p + SEPARATOR;
}

/**
* Returns this path's parent path.
*
* @return Parent path or {@code null} if there is none
*/
@Nullable
public BlobPath parent() {
if (paths.isEmpty()) {
return null;
} else {
return new BlobPath(List.copyOf(paths.subList(0, paths.size() - 1)));
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
}
}

@Override
public void delete() throws IOException {
IOUtils.rm(path);
}

@Override
public boolean blobExists(String blobName) {
return Files.exists(path.resolve(blobName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public void deleteBlob(String blobName) throws IOException {
delegate.deleteBlob(blobName);
}

@Override
public void delete() throws IOException {
delegate.delete();
}

@Override
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
delegate.deleteBlobIgnoringIfNotExists(blobName);
Expand Down
Loading