Skip to content

Commit 4271963

Browse files
Revert "Use Azure Bulk Deletes in Azure Repository (#53919)" (#54089) (#54111)
This reverts commit 23cccf0. Unfortunately SAS token auth still doesn't work with bulk deletes so we can't use them yet. Closes #54080
1 parent e380a5a commit 4271963

File tree

10 files changed

+132
-112
lines changed

10 files changed

+132
-112
lines changed

plugins/repository-azure/qa/microsoft-azure-storage/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,7 @@ testClusters.integTest {
8888
// in a hacky way to change the protocol and endpoint. We must fix that.
8989
setting 'azure.client.integration_test.endpoint_suffix',
9090
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
91+
String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
92+
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
9193
}
9294
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@
2323
import com.microsoft.azure.storage.StorageException;
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
26+
import org.elasticsearch.action.ActionListener;
27+
import org.elasticsearch.action.ActionRunnable;
28+
import org.elasticsearch.action.support.GroupedActionListener;
29+
import org.elasticsearch.action.support.PlainActionFuture;
2630
import org.elasticsearch.common.Nullable;
2731
import org.elasticsearch.common.blobstore.BlobContainer;
2832
import org.elasticsearch.common.blobstore.BlobMetaData;
2933
import org.elasticsearch.common.blobstore.BlobPath;
3034
import org.elasticsearch.common.blobstore.DeleteResult;
3135
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
36+
import org.elasticsearch.threadpool.ThreadPool;
3237

3338
import java.io.IOException;
3439
import java.io.InputStream;
@@ -37,18 +42,20 @@
3742
import java.nio.file.NoSuchFileException;
3843
import java.util.List;
3944
import java.util.Map;
40-
import java.util.stream.Collectors;
45+
import java.util.concurrent.ExecutorService;
4146

4247
public class AzureBlobContainer extends AbstractBlobContainer {
4348

4449
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
4550
private final AzureBlobStore blobStore;
51+
private final ThreadPool threadPool;
4652
private final String keyPath;
4753

48-
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
54+
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
4955
super(path);
5056
this.blobStore = blobStore;
5157
this.keyPath = path.buildAsString();
58+
this.threadPool = threadPool;
5259
}
5360

5461
private boolean blobExists(String blobName) {
@@ -105,17 +112,41 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
105112
@Override
106113
public DeleteResult delete() throws IOException {
107114
try {
108-
return blobStore.deleteBlobDirectory(keyPath);
115+
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
109116
} catch (URISyntaxException | StorageException e) {
110117
throw new IOException(e);
111118
}
112119
}
113120

114121
@Override
115122
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
123+
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
124+
if (blobNames.isEmpty()) {
125+
result.onResponse(null);
126+
} else {
127+
final GroupedActionListener<Void> listener =
128+
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
129+
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
130+
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
131+
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
132+
for (String blobName : blobNames) {
133+
executor.execute(ActionRunnable.run(listener, () -> {
134+
logger.trace("deleteBlob({})", blobName);
135+
try {
136+
blobStore.deleteBlob(buildKey(blobName));
137+
} catch (StorageException e) {
138+
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
139+
throw new IOException(e);
140+
}
141+
} catch (URISyntaxException e) {
142+
throw new IOException(e);
143+
}
144+
}));
145+
}
146+
}
116147
try {
117-
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
118-
} catch (URISyntaxException | StorageException e) {
148+
result.actionGet();
149+
} catch (Exception e) {
119150
throw new IOException("Exception during bulk delete", e);
120151
}
121152
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
import org.elasticsearch.common.blobstore.BlobStore;
2929
import org.elasticsearch.common.blobstore.DeleteResult;
3030
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
31+
import org.elasticsearch.threadpool.ThreadPool;
3132

3233
import java.io.IOException;
3334
import java.io.InputStream;
3435
import java.net.URISyntaxException;
35-
import java.util.Collection;
3636
import java.util.Collections;
3737
import java.util.Map;
38+
import java.util.concurrent.Executor;
3839
import java.util.function.Function;
3940
import java.util.stream.Collectors;
4041

@@ -43,15 +44,17 @@
4344
public class AzureBlobStore implements BlobStore {
4445

4546
private final AzureStorageService service;
47+
private final ThreadPool threadPool;
4648

4749
private final String clientName;
4850
private final String container;
4951
private final LocationMode locationMode;
5052

51-
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
53+
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
5254
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
5355
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
5456
this.service = service;
57+
this.threadPool = threadPool;
5558
// locationMode is set per repository, not per client
5659
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
5760
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@@ -77,7 +80,7 @@ public LocationMode getLocationMode() {
7780

7881
@Override
7982
public BlobContainer blobContainer(BlobPath path) {
80-
return new AzureBlobContainer(path, this);
83+
return new AzureBlobContainer(path, this, threadPool);
8184
}
8285

8386
@Override
@@ -88,12 +91,13 @@ public boolean blobExists(String blob) throws URISyntaxException, StorageExcepti
8891
return service.blobExists(clientName, container, blob);
8992
}
9093

91-
public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
92-
service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
94+
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
95+
service.deleteBlob(clientName, container, blob);
9396
}
9497

95-
public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
96-
return service.deleteBlobDirectory(clientName, container, path);
98+
public DeleteResult deleteBlobDirectory(String path, Executor executor)
99+
throws URISyntaxException, StorageException, IOException {
100+
return service.deleteBlobDirectory(clientName, container, path, executor);
97101
}
98102

99103
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
@@ -107,7 +111,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
107111

108112
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
109113
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
110-
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
114+
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
111115
}
112116

113117
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ protected BlobStore getBlobStore() {
115115

116116
@Override
117117
protected AzureBlobStore createBlobStore() {
118-
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
118+
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
119119

120120
logger.debug(() -> new ParameterizedMessage(
121121
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import org.elasticsearch.common.settings.Setting;
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.settings.SettingsException;
26+
import org.elasticsearch.common.unit.TimeValue;
2627
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2728
import org.elasticsearch.env.Environment;
2829
import org.elasticsearch.plugins.Plugin;
2930
import org.elasticsearch.plugins.ReloadablePlugin;
3031
import org.elasticsearch.plugins.RepositoryPlugin;
3132
import org.elasticsearch.repositories.Repository;
33+
import org.elasticsearch.threadpool.ExecutorBuilder;
34+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
35+
3236
import java.util.Arrays;
3337
import java.util.Collections;
3438
import java.util.List;
@@ -39,6 +43,8 @@
3943
*/
4044
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
4145

46+
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
47+
4248
// protected for testing
4349
final AzureStorageService azureStoreService;
4450

@@ -74,6 +80,15 @@ public List<Setting<?>> getSettings() {
7480
);
7581
}
7682

83+
@Override
84+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
85+
return Collections.singletonList(executorBuilder());
86+
}
87+
88+
public static ExecutorBuilder<?> executorBuilder() {
89+
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
90+
}
91+
7792
@Override
7893
public void reload(Settings settings) {
7994
// secure settings should be readable

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@
2020
package org.elasticsearch.repositories.azure;
2121

2222
import com.microsoft.azure.storage.AccessCondition;
23-
import com.microsoft.azure.storage.BatchException;
2423
import com.microsoft.azure.storage.CloudStorageAccount;
25-
import com.microsoft.azure.storage.Constants;
2624
import com.microsoft.azure.storage.OperationContext;
2725
import com.microsoft.azure.storage.RetryExponentialRetry;
2826
import com.microsoft.azure.storage.RetryPolicy;
2927
import com.microsoft.azure.storage.RetryPolicyFactory;
3028
import com.microsoft.azure.storage.StorageErrorCodeStrings;
3129
import com.microsoft.azure.storage.StorageException;
32-
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
3330
import com.microsoft.azure.storage.blob.BlobInputStream;
3431
import com.microsoft.azure.storage.blob.BlobListingDetails;
3532
import com.microsoft.azure.storage.blob.BlobProperties;
@@ -45,6 +42,7 @@
4542
import org.apache.logging.log4j.LogManager;
4643
import org.apache.logging.log4j.Logger;
4744
import org.apache.logging.log4j.message.ParameterizedMessage;
45+
import org.elasticsearch.action.support.PlainActionFuture;
4846
import org.elasticsearch.common.blobstore.BlobMetaData;
4947
import org.elasticsearch.common.blobstore.BlobPath;
5048
import org.elasticsearch.common.blobstore.DeleteResult;
@@ -55,6 +53,7 @@
5553
import org.elasticsearch.common.settings.SettingsException;
5654
import org.elasticsearch.common.unit.ByteSizeUnit;
5755
import org.elasticsearch.common.unit.ByteSizeValue;
56+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5857

5958
import java.io.IOException;
6059
import java.io.InputStream;
@@ -68,10 +67,9 @@
6867
import java.util.Collections;
6968
import java.util.EnumSet;
7069
import java.util.HashSet;
71-
import java.util.Iterator;
72-
import java.util.List;
7370
import java.util.Map;
7471
import java.util.Set;
72+
import java.util.concurrent.Executor;
7573
import java.util.concurrent.atomic.AtomicLong;
7674
import java.util.function.Supplier;
7775

@@ -190,61 +188,72 @@ public boolean blobExists(String account, String container, String blob) throws
190188
});
191189
}
192190

193-
public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
194-
throws URISyntaxException, StorageException {
195-
logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
191+
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
196192
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
197193
// Container name must be lower case.
198194
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
199-
final Iterator<String> blobIterator = blobs.iterator();
200-
int currentBatchSize = 0;
201-
while (blobIterator.hasNext()) {
202-
final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
203-
do {
204-
batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
205-
DeleteSnapshotsOption.NONE, null, null);
206-
++currentBatchSize;
207-
} while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
208-
currentBatchSize = 0;
209-
try {
210-
SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
211-
} catch (BatchException e) {
212-
for (StorageException ex : e.getExceptions().values()) {
213-
if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
214-
logger.error("Batch exceptions [{}]", e.getExceptions());
215-
throw e;
216-
}
217-
}
218-
}
219-
}
195+
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
196+
SocketAccess.doPrivilegedVoidException(() -> {
197+
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
198+
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
199+
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
200+
});
220201
}
221202

222-
DeleteResult deleteBlobDirectory(String account, String container, String path)
203+
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
223204
throws URISyntaxException, StorageException, IOException {
224205
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
225206
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
207+
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
208+
final AtomicLong outstanding = new AtomicLong(1L);
209+
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
226210
final AtomicLong blobsDeleted = new AtomicLong();
227211
final AtomicLong bytesDeleted = new AtomicLong();
228-
final List<String> blobsToDelete = new ArrayList<>();
229212
SocketAccess.doPrivilegedVoidException(() -> {
230-
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
213+
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
231214
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
232215
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
233216
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
234-
final long len;
235-
if (blobItem instanceof CloudBlob) {
236-
len = ((CloudBlob) blobItem).getProperties().getLength();
237-
} else {
238-
len = -1L;
239-
}
240-
blobsToDelete.add(blobPath);
241-
blobsDeleted.incrementAndGet();
242-
if (len >= 0) {
243-
bytesDeleted.addAndGet(len);
244-
}
217+
outstanding.incrementAndGet();
218+
executor.execute(new AbstractRunnable() {
219+
@Override
220+
protected void doRun() throws Exception {
221+
final long len;
222+
if (blobItem instanceof CloudBlob) {
223+
len = ((CloudBlob) blobItem).getProperties().getLength();
224+
} else {
225+
len = -1L;
226+
}
227+
deleteBlob(account, container, blobPath);
228+
blobsDeleted.incrementAndGet();
229+
if (len >= 0) {
230+
bytesDeleted.addAndGet(len);
231+
}
232+
}
233+
234+
@Override
235+
public void onFailure(Exception e) {
236+
exceptions.add(e);
237+
}
238+
239+
@Override
240+
public void onAfter() {
241+
if (outstanding.decrementAndGet() == 0) {
242+
result.onResponse(null);
243+
}
244+
}
245+
});
245246
}
246247
});
247-
deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
248+
if (outstanding.decrementAndGet() == 0) {
249+
result.onResponse(null);
250+
}
251+
result.actionGet();
252+
if (exceptions.isEmpty() == false) {
253+
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
254+
exceptions.forEach(ex::addSuppressed);
255+
throw ex;
256+
}
248257
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
249258
}
250259

0 commit comments

Comments
 (0)