From 8220d234ecf2c4106511f0fec23b4488b3b059b3 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 18 Jun 2019 18:27:37 +0200 Subject: [PATCH 1/3] Optimize Azure Directory Delete * Follow up to #43281: * Optimizing the Azure directory delete operation: * Same as with GCS and S3 we can simply flat list a prefix and then delete as we iterate instead of listing the directories recursively. This should require fewer actual list RPC calls and the logic becomes simpler --- .../azure/AzureBlobContainer.java | 47 ++++----------- .../repositories/azure/AzureBlobStore.java | 5 ++ .../azure/AzureStorageService.java | 59 +++++++++++++++++++ 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index ef4cd45bfcc6f..85ca5ba124886 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -39,8 +39,6 @@ import java.net.HttpURLConnection; import java.net.URISyntaxException; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -130,51 +128,21 @@ public void deleteBlob(String blobName) throws IOException { @Override public void delete() throws IOException { - PlainActionFuture result = PlainActionFuture.newFuture(); - asyncDelete(result); try { - result.actionGet(); - } catch (Exception e) { - throw new IOException("Exception during container delete", e); - } - } - - private void asyncDelete(ActionListener listener) throws IOException { - final Collection childContainers = children().values(); - if (childContainers.isEmpty() == false) { - final ActionListener childListener = new GroupedActionListener<>( - ActionListener.wrap(v -> asyncDeleteBlobsIgnoringIfNotExists( - new ArrayList<>(listBlobs().keySet()), listener), listener::onFailure), childContainers.size()); - for (BlobContainer container : childContainers) { - threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).submit(new ActionRunnable<>(childListener) { - @Override - protected void doRun() throws Exception { - ((AzureBlobContainer) container).asyncDelete(childListener); - } - }); - } - } else { - asyncDeleteBlobsIgnoringIfNotExists(new ArrayList<>(listBlobs().keySet()), listener); + blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)::submit); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); } } @Override public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { final PlainActionFuture result = PlainActionFuture.newFuture(); - asyncDeleteBlobsIgnoringIfNotExists(blobNames, result); - try { - result.actionGet(); - } catch (Exception e) { - throw new IOException("Exception during bulk delete", e); - } - } - - private void asyncDeleteBlobsIgnoringIfNotExists(List blobNames, ActionListener callback) { if (blobNames.isEmpty()) { - callback.onResponse(null); + result.onResponse(null); } else { final GroupedActionListener listener = - new GroupedActionListener<>(ActionListener.map(callback, v -> null), blobNames.size()); + new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size()); final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. @@ -188,6 +156,11 @@ protected void doRun() throws IOException { }); } } + try { + result.actionGet(); + } catch (Exception e) { + throw new IOException("Exception during bulk delete", e); + } } @Override diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 5d3f6c8570374..a7d9bb93a5125 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -36,6 +36,7 @@ import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -91,6 +92,10 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException service.deleteBlob(clientName, container, blob); } + public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException { + service.deleteBlobDirectory(clientName, container, path, executor); + } + public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { return service.getInputStream(clientName, container, blob); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index dabb0fd17cad1..850fb9d3faded 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -39,6 +39,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; @@ -47,6 +50,7 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import java.io.IOException; import java.io.InputStream; @@ -55,11 +59,16 @@ import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static java.util.Collections.emptyMap; @@ -185,6 +194,56 @@ public void deleteBlob(String account, String container, String blob) throws URI }); } + void deleteBlobDirectory(String account, String container, String path, Executor executor) + throws URISyntaxException, StorageException, IOException { + final Tuple> client = client(account); + final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); + final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); + final AtomicLong outstanding = new AtomicLong(0); + final SetOnce> resReference = new SetOnce<>(); + SocketAccess.doPrivilegedVoidException(() -> { + for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { + // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ + // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / + final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); + outstanding.incrementAndGet(); + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + deleteBlob(account, container, blobPath); + } + + @Override + public void onFailure(Exception e) { + exceptions.add(e); + } + + @Override + public void onAfter() { + if (outstanding.decrementAndGet() == 0) { + ActionListener resListener = resReference.get(); + if (resListener != null) { + resListener.onResponse(null); + } + } + } + }); + } + }); + if (outstanding.get() > 0) { + final PlainActionFuture result = PlainActionFuture.newFuture(); + resReference.set(result); + if (outstanding.get() > 0) { + result.actionGet(); + } + } + if (exceptions.isEmpty() == false) { + final IOException ex = new IOException("Deleting directory [" + path + "] failed"); + exceptions.forEach(ex::addSuppressed); + throw ex; + } + } + public InputStream getInputStream(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); From d797ae3f42d76fb70c78876e19dcf0f0a0db9494 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Jul 2019 11:15:08 +0200 Subject: [PATCH 2/3] CR: don't submit --- .../elasticsearch/repositories/azure/AzureBlobContainer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 85ca5ba124886..12113542dee44 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -129,7 +129,7 @@ public void deleteBlob(String blobName) throws IOException { @Override public void delete() throws IOException { try { - blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)::submit); + blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); } catch (URISyntaxException | StorageException e) { throw new IOException(e); } @@ -147,7 +147,7 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. for (String blobName : blobNames) { - executor.submit(new ActionRunnable<>(listener) { + executor.execute(new ActionRunnable<>(listener) { @Override protected void doRun() throws IOException { deleteBlobIgnoringIfNotExists(blobName); From 84b250e11f81069d0563de21a8a1dc3d78288352 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 1 Jul 2019 12:12:08 +0200 Subject: [PATCH 3/3] CR: simpler loop --- .../azure/AzureStorageService.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 850fb9d3faded..f4ee7b9dbcad9 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -39,8 +39,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -199,8 +197,8 @@ void deleteBlobDirectory(String account, String container, String path, Executor final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); - final AtomicLong outstanding = new AtomicLong(0); - final SetOnce> resReference = new SetOnce<>(); + final AtomicLong outstanding = new AtomicLong(1L); + final PlainActionFuture result = PlainActionFuture.newFuture(); SocketAccess.doPrivilegedVoidException(() -> { for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ @@ -221,22 +219,16 @@ public void onFailure(Exception e) { @Override public void onAfter() { if (outstanding.decrementAndGet() == 0) { - ActionListener resListener = resReference.get(); - if (resListener != null) { - resListener.onResponse(null); - } + result.onResponse(null); } } }); } }); - if (outstanding.get() > 0) { - final PlainActionFuture result = PlainActionFuture.newFuture(); - resReference.set(result); - if (outstanding.get() > 0) { - result.actionGet(); - } + if (outstanding.decrementAndGet() == 0) { + result.onResponse(null); } + result.actionGet(); if (exceptions.isEmpty() == false) { final IOException ex = new IOException("Deleting directory [" + path + "] failed"); exceptions.forEach(ex::addSuppressed);