Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -130,56 +128,26 @@ public void deleteBlob(String blobName) throws IOException {

@Override
public void delete() throws IOException {
PlainActionFuture<Void> result = PlainActionFuture.newFuture();
asyncDelete(result);
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during container delete", e);
}
}

private void asyncDelete(ActionListener<Void> listener) throws IOException {
final Collection<BlobContainer> childContainers = children().values();
if (childContainers.isEmpty() == false) {
final ActionListener<Void> childListener = new GroupedActionListener<>(
ActionListener.wrap(v -> asyncDeleteBlobsIgnoringIfNotExists(
new ArrayList<>(listBlobs().keySet()), listener), listener::onFailure), childContainers.size());
for (BlobContainer container : childContainers) {
threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).submit(new ActionRunnable<>(childListener) {
@Override
protected void doRun() throws Exception {
((AzureBlobContainer) container).asyncDelete(childListener);
}
});
}
} else {
asyncDeleteBlobsIgnoringIfNotExists(new ArrayList<>(listBlobs().keySet()), listener);
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
asyncDeleteBlobsIgnoringIfNotExists(blobNames, result);
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}

private void asyncDeleteBlobsIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> callback) {
if (blobNames.isEmpty()) {
callback.onResponse(null);
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(callback, v -> null), blobNames.size());
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<>(listener) {
executor.execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
Expand All @@ -188,6 +156,11 @@ protected void doRun() throws IOException {
});
}
}
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,6 +92,10 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException
service.deleteBlob(clientName, container, blob);
}

public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
service.deleteBlobDirectory(clientName, container, path, executor);
}

public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
return service.getInputStream(clientName, container, blob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
Expand All @@ -47,6 +48,7 @@
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -55,11 +57,16 @@
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -185,6 +192,50 @@ public void deleteBlob(String account, String container, String blob) throws URI
});
}

void deleteBlobDirectory(String account, String container, String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
deleteBlob(account, container, blobPath);
}

@Override
public void onFailure(Exception e) {
exceptions.add(e);
}

@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
});
}
});
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
result.actionGet();
if (exceptions.isEmpty() == false) {
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
exceptions.forEach(ex::addSuppressed);
throw ex;
}
}

public InputStream getInputStream(String account, String container, String blob)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
Expand Down