Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public class AzureBlobStore implements BlobStore {

private final Stats stats = new Stats();

private final Consumer<String> getMetricsCollector;
private final Consumer<String> listMetricsCollector;
private final Consumer<HttpURLConnection> getMetricsCollector;
private final Consumer<HttpURLConnection> listMetricsCollector;
private final Consumer<HttpURLConnection> uploadMetricsCollector;

public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
Expand All @@ -101,15 +102,35 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service,
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
final Map<String, AzureStorageSettings> newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode);
this.service.refreshAndClearCache(newSettings);
this.getMetricsCollector = (requestMethod) -> {
if (requestMethod.equalsIgnoreCase("HEAD")) {
this.getMetricsCollector = (httpURLConnection) -> {
if (httpURLConnection.getRequestMethod().equals("HEAD")) {
stats.headOperations.incrementAndGet();
return;
}
assert httpURLConnection.getRequestMethod().equals("GET");

stats.getOperations.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi-related to this PR since you did it in the new collector, let's add assertions in spots like this one to make sure, we're actually seeing a GET here :) Can probably do the same for the list collector also.

};
this.listMetricsCollector = (requestMethod) -> stats.listOperations.incrementAndGet();
this.listMetricsCollector = (httpURLConnection) -> {
assert httpURLConnection.getRequestMethod().equals("GET");
stats.listOperations.incrementAndGet();
};
this.uploadMetricsCollector = (httpURLConnection -> {
assert httpURLConnection.getRequestMethod().equals("PUT");
String queryParams = httpURLConnection.getURL().getQuery();
if (queryParams != null && isBlockUpload(queryParams)) {
stats.putBlockOperations.incrementAndGet();
} else {
stats.putOperations.incrementAndGet();
}
});
}

private boolean isBlockUpload(String queryParams) {
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
return (queryParams.contains("comp=block") && queryParams.contains("blockid="))
|| queryParams.contains("comp=blocklist");
}

@Override
Expand Down Expand Up @@ -290,13 +311,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
final OperationContext operationContext = hookMetricCollector(client().v2().get(), uploadMetricsCollector);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
try {
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), client.v2().get()));
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), operationContext));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
Expand All @@ -311,14 +333,13 @@ private Tuple<CloudBlobClient, Supplier<OperationContext>> client() {
return service.client(clientName);
}

private OperationContext hookMetricCollector(OperationContext context, Consumer<String> metricCollector) {
private OperationContext hookMetricCollector(OperationContext context, Consumer<HttpURLConnection> metricCollector) {
context.getRequestCompletedEventHandler().addListener(new StorageEvent<>() {
@Override
public void eventOccurred(RequestCompletedEvent eventArg) {
int statusCode = eventArg.getRequestResult().getStatusCode();
HttpURLConnection httpURLConnection = (HttpURLConnection) eventArg.getConnectionObject();
if (statusCode < 300) {
metricCollector.accept(httpURLConnection.getRequestMethod());
metricCollector.accept((HttpURLConnection) eventArg.getConnectionObject());
}
}
});
Expand Down Expand Up @@ -357,10 +378,16 @@ private static class Stats {

private final AtomicLong headOperations = new AtomicLong();

private final AtomicLong putOperations = new AtomicLong();

private final AtomicLong putBlockOperations = new AtomicLong();

private Map<String, Long> toMap() {
return Map.of("GET", getOperations.get(),
"LIST", listOperations.get(),
"HEAD", headOperations.get());
"HEAD", headOperations.get(),
"PUT", putOperations.get(),
"PUT_BLOCK", putBlockOperations.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {

@Override
protected List<String> requestTypesTracked() {
return List.of("GET", "LIST", "HEAD");
return List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK");
}

@Override
Expand Down Expand Up @@ -186,7 +186,18 @@ protected void maybeTrack(String request, Headers headers) {
trackRequest("HEAD");
} else if (listPattern.test(request)) {
trackRequest("LIST");
} else if (isBlockUpload(request)) {
trackRequest("PUT_BLOCK");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PUT");
}
}

// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
private boolean isBlockUpload(String request) {
return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sam as in the production code, matching on comp=blocklist seems redundant when matching comp=block?

|| (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="));
}
}
}