Skip to content

Commit 42a15c9

Browse files
authored
Track PUT/PUT_BLOCK operations on AzureBlobStore. (#57121)
Backport of #56936
1 parent c5f61fe commit 42a15c9

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ public class AzureBlobStore implements BlobStore {
9090

9191
private final Stats stats = new Stats();
9292

93-
private final Consumer<String> getMetricsCollector;
94-
private final Consumer<String> listMetricsCollector;
93+
private final Consumer<HttpURLConnection> getMetricsCollector;
94+
private final Consumer<HttpURLConnection> listMetricsCollector;
95+
private final Consumer<HttpURLConnection> uploadMetricsCollector;
9596

9697
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) {
9798
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
@@ -103,15 +104,35 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service,
103104
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
104105
final Map<String, AzureStorageSettings> newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode);
105106
this.service.refreshAndClearCache(newSettings);
106-
this.getMetricsCollector = (requestMethod) -> {
107-
if (requestMethod.equalsIgnoreCase("HEAD")) {
107+
this.getMetricsCollector = (httpURLConnection) -> {
108+
if (httpURLConnection.getRequestMethod().equals("HEAD")) {
108109
stats.headOperations.incrementAndGet();
109110
return;
110111
}
112+
assert httpURLConnection.getRequestMethod().equals("GET");
111113

112114
stats.getOperations.incrementAndGet();
113115
};
114-
this.listMetricsCollector = (requestMethod) -> stats.listOperations.incrementAndGet();
116+
this.listMetricsCollector = (httpURLConnection) -> {
117+
assert httpURLConnection.getRequestMethod().equals("GET");
118+
stats.listOperations.incrementAndGet();
119+
};
120+
this.uploadMetricsCollector = (httpURLConnection -> {
121+
assert httpURLConnection.getRequestMethod().equals("PUT");
122+
String queryParams = httpURLConnection.getURL().getQuery();
123+
if (queryParams != null && isBlockUpload(queryParams)) {
124+
stats.putBlockOperations.incrementAndGet();
125+
} else {
126+
stats.putOperations.incrementAndGet();
127+
}
128+
});
129+
}
130+
131+
private boolean isBlockUpload(String queryParams) {
132+
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
133+
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
134+
return (queryParams.contains("comp=block") && queryParams.contains("blockid="))
135+
|| queryParams.contains("comp=blocklist");
115136
}
116137

117138
@Override
@@ -292,13 +313,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
292313
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
293314
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
294315
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
316+
final OperationContext operationContext = hookMetricCollector(client().v2().get(), uploadMetricsCollector);
295317
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
296318
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
297319
try {
298320
final AccessCondition accessCondition =
299321
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
300322
SocketAccess.doPrivilegedVoidException(() ->
301-
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), client.v2().get()));
323+
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), operationContext));
302324
} catch (final StorageException se) {
303325
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
304326
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
@@ -313,14 +335,13 @@ private Tuple<CloudBlobClient, Supplier<OperationContext>> client() {
313335
return service.client(clientName);
314336
}
315337

316-
private OperationContext hookMetricCollector(OperationContext context, Consumer<String> metricCollector) {
338+
private OperationContext hookMetricCollector(OperationContext context, Consumer<HttpURLConnection> metricCollector) {
317339
context.getRequestCompletedEventHandler().addListener(new StorageEvent<RequestCompletedEvent>() {
318340
@Override
319341
public void eventOccurred(RequestCompletedEvent eventArg) {
320342
int statusCode = eventArg.getRequestResult().getStatusCode();
321-
HttpURLConnection httpURLConnection = (HttpURLConnection) eventArg.getConnectionObject();
322343
if (statusCode < 300) {
323-
metricCollector.accept(httpURLConnection.getRequestMethod());
344+
metricCollector.accept((HttpURLConnection) eventArg.getConnectionObject());
324345
}
325346
}
326347
});
@@ -359,11 +380,17 @@ private static class Stats {
359380

360381
private final AtomicLong headOperations = new AtomicLong();
361382

383+
private final AtomicLong putOperations = new AtomicLong();
384+
385+
private final AtomicLong putBlockOperations = new AtomicLong();
386+
362387
private Map<String, Long> toMap() {
363388
return org.elasticsearch.common.collect.Map.of(
364389
"GET", getOperations.get(),
365390
"LIST", listOperations.get(),
366-
"HEAD", headOperations.get()
391+
"HEAD", headOperations.get(),
392+
"PUT", putOperations.get(),
393+
"PUT_BLOCK", putBlockOperations.get()
367394
);
368395
}
369396
}

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
7878

7979
@Override
8080
protected List<String> requestTypesTracked() {
81-
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD");
81+
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK");
8282
}
8383

8484
@Override
@@ -185,7 +185,18 @@ protected void maybeTrack(String request, Headers headers) {
185185
trackRequest("HEAD");
186186
} else if (listPattern.matcher(request).matches()) {
187187
trackRequest("LIST");
188+
} else if (isBlockUpload(request)) {
189+
trackRequest("PUT_BLOCK");
190+
} else if (Regex.simpleMatch("PUT /*/*", request)) {
191+
trackRequest("PUT");
188192
}
189193
}
194+
195+
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
196+
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
197+
private boolean isBlockUpload(String request) {
198+
return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request)
199+
|| (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="));
200+
}
190201
}
191202
}

0 commit comments

Comments
 (0)