Skip to content

Commit 2f0a149

Browse files
mehakmeetMehakmeet Singh
authored andcommitted
HADOOP-16910 : ABFS Streams to update FileSystem.Statistics counters on IO.
Contributed by Mehakmeet Singh
1 parent cbe71ea commit 2f0a149

File tree

7 files changed

+275
-21
lines changed

7 files changed

+275
-21
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
188188
Path qualifiedPath = makeQualified(f);
189189

190190
try {
191-
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
191+
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
192192
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
193193
return new FSDataOutputStream(outputStream, statistics);
194194
} catch(AzureBlobFileSystemException ex) {
@@ -250,7 +250,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
250250
Path qualifiedPath = makeQualified(f);
251251

252252
try {
253-
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
253+
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
254254
return new FSDataOutputStream(outputStream, statistics);
255255
} catch(AzureBlobFileSystemException ex) {
256256
checkException(f, ex);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import com.google.common.annotations.VisibleForTesting;
5252
import com.google.common.base.Preconditions;
5353
import com.google.common.base.Strings;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
5456

5557
import org.apache.hadoop.classification.InterfaceAudience;
5658
import org.apache.hadoop.classification.InterfaceStability;
@@ -97,8 +99,6 @@
9799
import org.apache.hadoop.io.IOUtils;
98100
import org.apache.hadoop.security.UserGroupInformation;
99101
import org.apache.http.client.utils.URIBuilder;
100-
import org.slf4j.Logger;
101-
import org.slf4j.LoggerFactory;
102102

103103
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
104104
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
@@ -391,8 +391,10 @@ public void deleteFilesystem() throws AzureBlobFileSystemException {
391391
}
392392
}
393393

394-
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
395-
final FsPermission umask) throws AzureBlobFileSystemException {
394+
public OutputStream createFile(final Path path,
395+
final FileSystem.Statistics statistics,
396+
final boolean overwrite, final FsPermission permission,
397+
final FsPermission umask) throws AzureBlobFileSystemException {
396398
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
397399
boolean isNamespaceEnabled = getIsNamespaceEnabled();
398400
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -409,12 +411,13 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
409411
perfInfo.registerResult(op.getResult()).registerSuccess(true);
410412

411413
return new AbfsOutputStream(
412-
client,
413-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
414-
0,
415-
abfsConfiguration.getWriteBufferSize(),
416-
abfsConfiguration.isFlushEnabled(),
417-
abfsConfiguration.isOutputStreamFlushDisabled());
414+
client,
415+
statistics,
416+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
417+
0,
418+
abfsConfiguration.getWriteBufferSize(),
419+
abfsConfiguration.isFlushEnabled(),
420+
abfsConfiguration.isOutputStreamFlushDisabled());
418421
}
419422
}
420423

@@ -468,7 +471,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
468471
}
469472
}
470473

471-
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
474+
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
472475
AzureBlobFileSystemException {
473476
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
474477
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@@ -495,12 +498,13 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
495498
perfInfo.registerSuccess(true);
496499

497500
return new AbfsOutputStream(
498-
client,
499-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
500-
offset,
501-
abfsConfiguration.getWriteBufferSize(),
502-
abfsConfiguration.isFlushEnabled(),
503-
abfsConfiguration.isOutputStreamFlushDisabled());
501+
client,
502+
statistics,
503+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
504+
offset,
505+
abfsConfiguration.getWriteBufferSize(),
506+
abfsConfiguration.isFlushEnabled(),
507+
abfsConfiguration.isOutputStreamFlushDisabled());
504508
}
505509
}
506510

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
101101
int currentLen = len;
102102
int lastReadBytes;
103103
int totalReadBytes = 0;
104+
incrementReadOps();
104105
do {
105106
lastReadBytes = readOneBlock(b, currentOff, currentLen);
106107
if (lastReadBytes > 0) {
@@ -201,6 +202,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
201202
// try reading from buffers first
202203
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
203204
if (receivedBytes > 0) {
205+
incrementReadOps();
204206
return receivedBytes;
205207
}
206208

@@ -236,6 +238,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
236238
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
237239
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
238240
perfInfo.registerResult(op.getResult()).registerSuccess(true);
241+
incrementReadOps();
239242
} catch (AzureBlobFileSystemException ex) {
240243
if (ex instanceof AbfsRestOperationException) {
241244
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@@ -252,6 +255,15 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
252255
return (int) bytesRead;
253256
}
254257

258+
/**
259+
* Increment Read Operations.
260+
*/
261+
private void incrementReadOps() {
262+
if (statistics != null) {
263+
statistics.incrementReadOps(1);
264+
}
265+
}
266+
255267
/**
256268
* Seek to given position in stream.
257269
* @param n position to seek to

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
4040
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
4141
import org.apache.hadoop.io.ElasticByteBufferPool;
42+
import org.apache.hadoop.fs.FileSystem.Statistics;
4243
import org.apache.hadoop.fs.FSExceptionMessages;
4344
import org.apache.hadoop.fs.StreamCapabilities;
4445
import org.apache.hadoop.fs.Syncable;
@@ -78,14 +79,18 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
7879
private final ElasticByteBufferPool byteBufferPool
7980
= new ElasticByteBufferPool();
8081

82+
private final Statistics statistics;
83+
8184
public AbfsOutputStream(
8285
final AbfsClient client,
86+
final Statistics statistics,
8387
final String path,
8488
final long position,
8589
final int bufferSize,
8690
final boolean supportFlush,
8791
final boolean disableOutputStreamFlush) {
8892
this.client = client;
93+
this.statistics = statistics;
8994
this.path = path;
9095
this.position = position;
9196
this.closed = false;
@@ -181,6 +186,16 @@ public synchronized void write(final byte[] data, final int off, final int lengt
181186

182187
writableBytes = bufferSize - bufferIndex;
183188
}
189+
incrementWriteOps();
190+
}
191+
192+
/**
193+
* Increment Write Operations.
194+
*/
195+
private void incrementWriteOps() {
196+
if (statistics != null) {
197+
statistics.incrementWriteOps(1);
198+
}
184199
}
185200

186201
/**

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717
*/
1818
package org.apache.hadoop.fs.azurebfs;
1919

20+
import java.io.IOException;
21+
2022
import org.junit.Assert;
2123
import org.junit.Before;
2224
import org.junit.BeforeClass;
2325
import org.junit.Rule;
2426
import org.junit.rules.TestName;
2527
import org.junit.rules.Timeout;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.hadoop.fs.FSDataInputStream;
32+
import org.apache.hadoop.fs.Path;
2633

2734
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
2835

@@ -31,6 +38,9 @@
3138
* This class does not attempt to bind to Azure.
3239
*/
3340
public class AbstractAbfsTestWithTimeout extends Assert {
41+
private static final Logger LOG =
42+
LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);
43+
3444
/**
3545
* The name of the current method.
3646
*/
@@ -67,4 +77,53 @@ public void nameThread() {
6777
protected int getTestTimeoutMillis() {
6878
return TEST_TIMEOUT;
6979
}
80+
81+
/**
82+
* Describe a test in the logs.
83+
*
84+
* @param text text to print
85+
* @param args arguments to format in the printing
86+
*/
87+
protected void describe(String text, Object... args) {
88+
LOG.info("\n\n{}: {}\n",
89+
methodName.getMethodName(),
90+
String.format(text, args));
91+
}
92+
93+
/**
94+
* Validate Contents written on a file in Abfs.
95+
*
96+
* @param fs AzureBlobFileSystem
97+
* @param path Path of the file
98+
* @param originalByteArray original byte array
99+
* @return if content is validated true else, false
100+
* @throws IOException
101+
*/
102+
protected boolean validateContent(AzureBlobFileSystem fs, Path path,
103+
byte[] originalByteArray)
104+
throws IOException {
105+
int pos = 0;
106+
int lenOfOriginalByteArray = originalByteArray.length;
107+
108+
try (FSDataInputStream in = fs.open(path)) {
109+
byte valueOfContentAtPos = (byte) in.read();
110+
111+
while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
112+
if (originalByteArray[pos] != valueOfContentAtPos) {
113+
assertEquals("Mismatch in content validation at position {}", pos,
114+
originalByteArray[pos], valueOfContentAtPos);
115+
return false;
116+
}
117+
valueOfContentAtPos = (byte) in.read();
118+
pos++;
119+
}
120+
if (valueOfContentAtPos != -1) {
121+
assertEquals("Expected end of file", -1, valueOfContentAtPos);
122+
return false;
123+
}
124+
return true;
125+
}
126+
127+
}
128+
70129
}

0 commit comments

Comments
 (0)