Skip to content
Closed
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_USE_UPN)
private boolean useUpn;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_LATENCY_TRACK,
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;

private Map<String, String> storageAccountKeys;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
Expand Down Expand Up @@ -471,6 +475,15 @@ public boolean isUpnUsed() {
return this.useUpn;
}

/**
* Whether {@code AbfsClient} should track and send latency info back to storage servers.
*
* @return a boolean indicating whether latency should be tracked.
*/
public boolean shouldTrackLatency() {
return this.trackLatency;
}

public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType == AuthType.OAuth) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token";
/** Key for oauth AAD refresh token endpoint: {@value}. */
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint";
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";

public static String accountProperty(String property, String account) {
return property + "." + account;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_HTTPS = true;

public static final boolean DEFAULT_USE_UPN = false;
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.contracts.services;

import org.apache.hadoop.classification.InterfaceStability;

/**
* The AbfsPerfLoggable contract.
*/
@InterfaceStability.Evolving
public interface AbfsPerfLoggable {
/**
* Gets the string to log to the Abfs Logging API.
*
* @return the string that will be logged.
*/
String getLogString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ public class AbfsClient implements Closeable {
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;

private final AccessTokenProvider tokenProvider;


public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider) {
final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
Expand All @@ -88,6 +90,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent

this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.tokenProvider = tokenProvider;
this.abfsPerfTracker = abfsPerfTracker;
}

@Override
Expand All @@ -101,6 +104,10 @@ public String getFileSystem() {
return filesystem;
}

protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}

ExponentialRetryPolicy getRetryPolicy() {
return retryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.List;
import java.util.UUID;

Expand All @@ -40,12 +42,13 @@

import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;

/**
* Represents an HTTP operation.
*/
public class AbfsHttpOperation {
public class AbfsHttpOperation implements AbfsPerfLoggable {
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);

private static final int CONNECT_TIMEOUT = 30 * 1000;
Expand Down Expand Up @@ -161,6 +164,47 @@ public String toString() {
return sb.toString();
}

// Returns a trace message for the ABFS API logging service to consume
public String getLogString() {
String urlStr = null;

try {
urlStr = URLEncoder.encode(url.toString(), "UTF-8");
} catch(UnsupportedEncodingException e) {
urlStr = "https%3A%2F%2Ffailed%2Fto%2Fencode%2Furl";
}

final StringBuilder sb = new StringBuilder();
sb.append("s=")
.append(statusCode)
.append(" e=")
.append(storageErrorCode)
.append(" ci=")
.append(clientRequestId)
.append(" ri=")
.append(requestId);

if (isTraceEnabled) {
sb.append(" ct=")
.append(connectionTimeMs)
.append(" st=")
.append(sendRequestTimeMs)
.append(" rt=")
.append(recvResponseTimeMs);
}

sb.append(" bs=")
.append(bytesSent)
.append(" br=")
.append(bytesReceived)
.append(" m=")
.append(method)
.append(" u=")
.append(urlStr);

return sb.toString();
}

/**
* Initializes a new HTTP request and opens the connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
final AbfsRestOperation op;
try {
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,16 @@ private synchronized void writeCurrentBufferToService() throws IOException {
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
client.append(path, offset, bytes, 0,
bytesLength);
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
return null;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AbfsRestOperation op = client.append(path, offset, bytes, 0,
bytesLength);
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return null;
}
}
});

Expand Down Expand Up @@ -334,8 +340,11 @@ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {

private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException {
try {
client.flush(path, offset, retainUncommitedData, isClose);
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.time.Instant;

import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;

/**
* {@code AbfsPerfInfo} holds information on ADLS Gen 2 API performance observed by {@code AbfsClient}. Every
* Abfs request keeps adding its information (success/failure, latency etc) to its {@code AbfsPerfInfo}'s object
* as and when it becomes available. When the request is over, the performance information is recorded while
* the {@code AutoCloseable} {@code AbfsPerfInfo} object is "closed".
*/
public final class AbfsPerfInfo implements AutoCloseable {

// the tracker which will be extracting perf info out of this object.
private AbfsPerfTracker abfsPerfTracker;

// the caller name.
private String callerName;

// the callee name.
private String calleeName;

// time when this tracking started.
private Instant trackingStart;

// time when this tracking ended.
private Instant trackingEnd;

// whether the tracked request was successful.
private boolean success;

// time when the aggregate operation (to which this request belongs) started.
private Instant aggregateStart;

// number of requests in the aggregate operation (to which this request belongs).
private long aggregateCount;

// result of the request.
private AbfsPerfLoggable res;

public AbfsPerfInfo(AbfsPerfTracker abfsPerfTracker, String callerName, String calleeName) {
this.callerName = callerName;
this.calleeName = calleeName;
this.abfsPerfTracker = abfsPerfTracker;
this.success = false;
this.trackingStart = abfsPerfTracker.getLatencyInstant();
}

public AbfsPerfInfo registerSuccess(boolean success) {
this.success = success;
return this;
}

public AbfsPerfInfo registerResult(AbfsPerfLoggable res) {
this.res = res;
return this;
}

public AbfsPerfInfo registerAggregates(Instant aggregateStart, long aggregateCount) {
this.aggregateStart = aggregateStart;
this.aggregateCount = aggregateCount;
return this;
}

public AbfsPerfInfo finishTracking() {
if (this.trackingEnd == null) {
this.trackingEnd = abfsPerfTracker.getLatencyInstant();
}

return this;
}

public AbfsPerfInfo registerCallee(String calleeName) {
this.calleeName = calleeName;
return this;
}

@Override
public void close() {
abfsPerfTracker.trackInfo(this.finishTracking());
}

public String getCallerName() {
return callerName;
};

public String getCalleeName() {
return calleeName;
}

public Instant getTrackingStart() {
return trackingStart;
}

public Instant getTrackingEnd() {
return trackingEnd;
}

public boolean getSuccess() {
return success;
}

public Instant getAggregateStart() {
return aggregateStart;
}

public long getAggregateCount() {
return aggregateCount;
}

public AbfsPerfLoggable getResult() {
return res;
}
}
Loading