Skip to content

Commit d20109c

Browse files
authored
HADOOP-17058. ABFS: Support for AppendBlob in Hadoop ABFS Driver
- Contributed by Ishani Ahuja
1 parent f86f15c commit d20109c

21 files changed

+714
-42
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public class AbfsConfiguration{
153153
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
154154
private String azureAtomicDirs;
155155

156+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
157+
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
158+
private String azureAppendBlobDirs;
159+
156160
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
157161
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
158162
private boolean createRemoteFileSystemDuringInitialization;
@@ -544,6 +548,10 @@ public String getAzureAtomicRenameDirs() {
544548
return this.azureAtomicDirs;
545549
}
546550

551+
public String getAppendBlobDirs() {
552+
return this.azureAppendBlobDirs;
553+
}
554+
547555
public boolean getCreateRemoteFileSystemDuringInitialization() {
548556
// we do not support creating the filesystem when AuthType is SAS
549557
return this.createRemoteFileSystemDuringInitialization

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.hadoop.fs.Path;
6363
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
6464
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
65+
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
6566
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
6667
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
6768
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -145,6 +146,11 @@ public class AzureBlobFileSystemStore implements Closeable {
145146
private final IdentityTransformerInterface identityTransformer;
146147
private final AbfsPerfTracker abfsPerfTracker;
147148

149+
/**
150+
* The set of directories where we should store files as append blobs.
151+
*/
152+
private Set<String> appendBlobDirSet;
153+
148154
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
149155
Configuration configuration,
150156
AbfsCounters abfsCounters) throws IOException {
@@ -196,6 +202,23 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
196202
throw new IOException(e);
197203
}
198204
LOG.trace("IdentityTransformer init complete");
205+
206+
// Extract the directories that should contain append blobs
207+
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
208+
if (appendBlobDirs.trim().isEmpty()) {
209+
this.appendBlobDirSet = new HashSet<String>();
210+
} else {
211+
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
212+
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
213+
}
214+
}
215+
216+
/**
217+
* Checks if the given key in Azure Storage should be stored as a page
218+
* blob instead of block blob.
219+
*/
220+
public boolean isAppendBlobKey(String key) {
221+
return isKeyForDirectorySet(key, appendBlobDirSet);
199222
}
200223

201224
/**
@@ -431,27 +454,37 @@ public OutputStream createFile(final Path path,
431454
isNamespaceEnabled);
432455

433456
String relativePath = getRelativePath(path);
457+
boolean isAppendBlob = false;
458+
if (isAppendBlobKey(path.toString())) {
459+
isAppendBlob = true;
460+
}
434461

435462
final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
436463
isNamespaceEnabled ? getOctalNotation(permission) : null,
437-
isNamespaceEnabled ? getOctalNotation(umask) : null);
464+
isNamespaceEnabled ? getOctalNotation(umask) : null,
465+
isAppendBlob);
438466
perfInfo.registerResult(op.getResult()).registerSuccess(true);
439467

440468
return new AbfsOutputStream(
441469
client,
442470
statistics,
443471
relativePath,
444472
0,
445-
populateAbfsOutputStreamContext());
473+
populateAbfsOutputStreamContext(isAppendBlob));
446474
}
447475
}
448476

449-
private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
477+
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
478+
int bufferSize = abfsConfiguration.getWriteBufferSize();
479+
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
480+
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
481+
}
450482
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
451-
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
483+
.withWriteBufferSize(bufferSize)
452484
.enableFlush(abfsConfiguration.isFlushEnabled())
453485
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
454486
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
487+
.withAppendBlob(isAppendBlob)
455488
.build();
456489
}
457490

@@ -468,7 +501,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina
468501

469502
final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
470503
isNamespaceEnabled ? getOctalNotation(permission) : null,
471-
isNamespaceEnabled ? getOctalNotation(umask) : null);
504+
isNamespaceEnabled ? getOctalNotation(umask) : null, false);
472505
perfInfo.registerResult(op.getResult()).registerSuccess(true);
473506
}
474507
}
@@ -544,12 +577,17 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
544577

545578
perfInfo.registerSuccess(true);
546579

580+
boolean isAppendBlob = false;
581+
if (isAppendBlobKey(path.toString())) {
582+
isAppendBlob = true;
583+
}
584+
547585
return new AbfsOutputStream(
548586
client,
549587
statistics,
550588
relativePath,
551589
offset,
552-
populateAbfsOutputStreamContext());
590+
populateAbfsOutputStreamContext(isAppendBlob));
553591
}
554592
}
555593

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class AbfsHttpConstants {
4040
public static final String CHECK_ACCESS = "checkAccess";
4141
public static final String GET_STATUS = "getStatus";
4242
public static final String DEFAULT_TIMEOUT = "90";
43+
public static final String APPEND_BLOB_TYPE = "appendblob";
4344
public static final String TOKEN_VERSION = "2";
4445

4546
public static final String JAVA_VENDOR = "java.vendor";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public final class ConfigurationKeys {
5959
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
6060
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
6161
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
62+
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
63+
* Default is empty. **/
64+
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
6265
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
6366
/** Provides a config control to enable or disable ABFS Flush operations -
6467
* HFlush and HSync. Default is true. **/

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public final class FileSystemConfigurations {
4747

4848
// Default upload and download buffer size
4949
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
50+
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
5051
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
5152
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
5253
public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB
@@ -61,6 +62,7 @@ public final class FileSystemConfigurations {
6162
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
6263

6364
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
65+
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
6466

6567
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
6668
public static final boolean DEFAULT_ENABLE_FLUSH = true;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public final class HttpQueryParams {
3838
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
3939
public static final String QUERY_PARAM_CLOSE = "close";
4040
public static final String QUERY_PARAM_UPN = "upn";
41+
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
4142

4243
private HttpQueryParams() {}
4344
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
272272
}
273273

274274
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
275-
final String permission, final String umask) throws AzureBlobFileSystemException {
275+
final String permission, final String umask,
276+
final boolean isAppendBlob) throws AzureBlobFileSystemException {
276277
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
277278
if (!overwrite) {
278279
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -288,6 +289,9 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
288289

289290
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
290291
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
292+
if (isAppendBlob) {
293+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
294+
}
291295

292296
String operation = isFile
293297
? SASTokenProvider.CREATE_FILE_OPERATION
@@ -380,7 +384,7 @@ public AbfsRestOperation renameIdempotencyCheckOp(
380384
}
381385

382386
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
383-
final int length, final String cachedSasToken) throws AzureBlobFileSystemException {
387+
final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
384388
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
385389
// JDK7 does not support PATCH, so to workaround the issue we will use
386390
// PUT and specify the real method in the X-Http-Method-Override header.
@@ -401,10 +405,46 @@ public AbfsRestOperation append(final String path, final long position, final by
401405
HTTP_METHOD_PUT,
402406
url,
403407
requestHeaders, buffer, offset, length, sasTokenForReuse);
404-
op.execute();
408+
try {
409+
op.execute();
410+
} catch (AzureBlobFileSystemException e) {
411+
if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
412+
final AbfsRestOperation successOp = new AbfsRestOperation(
413+
AbfsRestOperationType.Append,
414+
this,
415+
HTTP_METHOD_PUT,
416+
url,
417+
requestHeaders, buffer, offset, length, sasTokenForReuse);
418+
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
419+
return successOp;
420+
}
421+
throw e;
422+
}
423+
405424
return op;
406425
}
407426

427+
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
428+
// However a retry would fail with an InvalidQueryParameterValue
429+
// (as the current offset would be unacceptable).
430+
// Hence, we pass/succeed the appendblob append call
431+
// in case we are doing a retry after checking the length of the file
432+
public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
433+
final long length) throws AzureBlobFileSystemException {
434+
if ((op.isARetriedRequest())
435+
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) {
436+
final AbfsRestOperation destStatusOp = getPathStatus(path, false);
437+
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
438+
String fileLength = destStatusOp.getResult().getResponseHeader(
439+
HttpHeaderConfigurations.CONTENT_LENGTH);
440+
if (length <= Long.parseLong(fileLength)) {
441+
return true;
442+
}
443+
}
444+
}
445+
return false;
446+
}
447+
408448
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
409449
boolean isClose, final String cachedSasToken)
410450
throws AzureBlobFileSystemException {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
6969
private String storageErrorMessage = "";
7070
private String clientRequestId = "";
7171
private String requestId = "";
72+
private String expectedAppendPos = "";
7273
private ListResultSchema listResultSchema = null;
7374

7475
// metrics
@@ -126,6 +127,10 @@ public String getClientRequestId() {
126127
return clientRequestId;
127128
}
128129

130+
public String getExpectedAppendPos() {
131+
return expectedAppendPos;
132+
}
133+
129134
public String getRequestId() {
130135
return requestId;
131136
}
@@ -154,6 +159,8 @@ public String toString() {
154159
sb.append(statusCode);
155160
sb.append(",");
156161
sb.append(storageErrorCode);
162+
sb.append(",");
163+
sb.append(expectedAppendPos);
157164
sb.append(",cid=");
158165
sb.append(clientRequestId);
159166
sb.append(",rid=");
@@ -449,6 +456,9 @@ private void processStorageErrorResponse() {
449456
case "message":
450457
storageErrorMessage = fieldValue;
451458
break;
459+
case "ExpectedAppendPos":
460+
expectedAppendPos = fieldValue;
461+
break;
452462
default:
453463
break;
454464
}

0 commit comments

Comments
 (0)