Skip to content

Commit 3b5c9a9

Browse files
authored
HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
Contributed by Mehakmeet Singh.
1 parent 04abd0e commit 3b5c9a9

File tree

7 files changed

+732
-0
lines changed

7 files changed

+732
-0
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
8787
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
8888
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
89+
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
8990
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
9091
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
9192
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
@@ -511,6 +512,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
511512
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
512513
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
513514
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
515+
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
514516
.build();
515517
}
516518

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
6868
// of valid bytes in buffer)
6969
private boolean closed = false;
7070

71+
/** Stream statistics. */
72+
private final AbfsInputStreamStatistics streamStatistics;
73+
7174
public AbfsInputStream(
7275
final AbfsClient client,
7376
final Statistics statistics,
@@ -86,6 +89,7 @@ public AbfsInputStream(
8689
this.readAheadEnabled = true;
8790
this.cachedSasToken = new CachedSASToken(
8891
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
92+
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
8993
}
9094

9195
public String getPath() {
@@ -105,10 +109,21 @@ public int read() throws IOException {
105109

106110
@Override
107111
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
112+
// check if buffer is null before logging the length
113+
if (b != null) {
114+
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
115+
off, len);
116+
} else {
117+
LOG.debug("read requested b = null offset = {} len = {}", off, len);
118+
}
119+
108120
int currentOff = off;
109121
int currentLen = len;
110122
int lastReadBytes;
111123
int totalReadBytes = 0;
124+
if (streamStatistics != null) {
125+
streamStatistics.readOperationStarted(off, len);
126+
}
112127
incrementReadOps();
113128
do {
114129
lastReadBytes = readOneBlock(b, currentOff, currentLen);
@@ -130,6 +145,8 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
130145
}
131146

132147
Preconditions.checkNotNull(b);
148+
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
149+
off, len);
133150

134151
if (len == 0) {
135152
return 0;
@@ -155,6 +172,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
155172
bCursor = 0;
156173
limit = 0;
157174
if (buffer == null) {
175+
LOG.debug("created new buffer size {}", bufferSize);
158176
buffer = new byte[bufferSize];
159177
}
160178

@@ -183,6 +201,11 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
183201
if (statistics != null) {
184202
statistics.incrementBytesRead(bytesToRead);
185203
}
204+
if (streamStatistics != null) {
205+
// Bytes read from the local buffer.
206+
streamStatistics.bytesReadFromBuffer(bytesToRead);
207+
streamStatistics.bytesRead(bytesToRead);
208+
}
186209
return bytesToRead;
187210
}
188211

@@ -200,8 +223,11 @@ private int readInternal(final long position, final byte[] b, final int offset,
200223
int numReadAheads = this.readAheadQueueDepth;
201224
long nextSize;
202225
long nextOffset = position;
226+
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
203227
while (numReadAheads > 0 && nextOffset < contentLength) {
204228
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
229+
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
230+
nextOffset, nextSize);
205231
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
206232
nextOffset = nextOffset + nextSize;
207233
numReadAheads--;
@@ -211,13 +237,15 @@ private int readInternal(final long position, final byte[] b, final int offset,
211237
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
212238
if (receivedBytes > 0) {
213239
incrementReadOps();
240+
LOG.debug("Received data from read ahead, not doing remote read");
214241
return receivedBytes;
215242
}
216243

217244
// got nothing from read-ahead, do our own read now
218245
receivedBytes = readRemote(position, b, offset, length);
219246
return receivedBytes;
220247
} else {
248+
LOG.debug("read ahead disabled, reading remote");
221249
return readRemote(position, b, offset, length);
222250
}
223251
}
@@ -247,6 +275,11 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
247275
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
248276
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
249277
cachedSasToken.update(op.getSasToken());
278+
if (streamStatistics != null) {
279+
streamStatistics.remoteReadOperation();
280+
}
281+
LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
282+
+ "offset = {} length = {}", position, b.length, offset, length);
250283
perfInfo.registerResult(op.getResult()).registerSuccess(true);
251284
incrementReadOps();
252285
} catch (AzureBlobFileSystemException ex) {
@@ -262,6 +295,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
262295
if (bytesRead > Integer.MAX_VALUE) {
263296
throw new IOException("Unexpected Content-Length");
264297
}
298+
LOG.debug("HTTP request read bytes = {}", bytesRead);
265299
return (int) bytesRead;
266300
}
267301

@@ -282,6 +316,7 @@ private void incrementReadOps() {
282316
*/
283317
@Override
284318
public synchronized void seek(long n) throws IOException {
319+
LOG.debug("requested seek to position {}", n);
285320
if (closed) {
286321
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
287322
}
@@ -292,13 +327,21 @@ public synchronized void seek(long n) throws IOException {
292327
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
293328
}
294329

330+
if (streamStatistics != null) {
331+
streamStatistics.seek(n, fCursor);
332+
}
333+
295334
if (n>=fCursor-limit && n<=fCursor) { // within buffer
296335
bCursor = (int) (n-(fCursor-limit));
336+
if (streamStatistics != null) {
337+
streamStatistics.seekInBuffer();
338+
}
297339
return;
298340
}
299341

300342
// next read will read from here
301343
fCursor = n;
344+
LOG.debug("set fCursor to {}", fCursor);
302345

303346
//invalidate buffer
304347
limit = 0;
@@ -390,6 +433,7 @@ public boolean seekToNewSource(long l) throws IOException {
390433
public synchronized void close() throws IOException {
391434
closed = true;
392435
buffer = null; // de-reference the buffer so it can be GC'ed sooner
436+
LOG.debug("Closing {}", this);
393437
}
394438

395439
/**
@@ -443,4 +487,28 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
443487
this.cachedSasToken = cachedSasToken;
444488
}
445489

490+
/**
491+
* Getter for AbfsInputStreamStatistics.
492+
*
493+
* @return an instance of AbfsInputStreamStatistics.
494+
*/
495+
@VisibleForTesting
496+
public AbfsInputStreamStatistics getStreamStatistics() {
497+
return streamStatistics;
498+
}
499+
500+
/**
501+
* Get the statistics of the stream.
502+
* @return a string value.
503+
*/
504+
@Override
505+
public String toString() {
506+
final StringBuilder sb = new StringBuilder(super.toString());
507+
if (streamStatistics != null) {
508+
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
509+
sb.append(streamStatistics.toString());
510+
sb.append("}");
511+
}
512+
return sb.toString();
513+
}
446514
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
2929

3030
private boolean tolerateOobAppends;
3131

32+
private AbfsInputStreamStatistics streamStatistics;
33+
3234
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
3335
super(sasTokenRenewPeriodForStreamsInSeconds);
3436
}
@@ -52,6 +54,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
5254
return this;
5355
}
5456

57+
public AbfsInputStreamContext withStreamStatistics(
58+
final AbfsInputStreamStatistics streamStatistics) {
59+
this.streamStatistics = streamStatistics;
60+
return this;
61+
}
62+
5563
public AbfsInputStreamContext build() {
5664
// Validation of parameters to be done here.
5765
return this;
@@ -68,4 +76,8 @@ public int getReadAheadQueueDepth() {
6876
public boolean isTolerateOobAppends() {
6977
return tolerateOobAppends;
7078
}
79+
80+
public AbfsInputStreamStatistics getStreamStatistics() {
81+
return streamStatistics;
82+
}
7183
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import org.apache.hadoop.classification.InterfaceStability;
22+
23+
/**
24+
* Interface for statistics for the AbfsInputStream.
25+
*/
26+
@InterfaceStability.Unstable
27+
public interface AbfsInputStreamStatistics {
28+
/**
29+
* Seek backwards, incrementing the seek and backward seek counters.
30+
*
31+
* @param negativeOffset how far was the seek?
32+
* This is expected to be negative.
33+
*/
34+
void seekBackwards(long negativeOffset);
35+
36+
/**
37+
* Record a forward seek, adding a seek operation, a forward
38+
* seek operation, and any bytes skipped.
39+
*
40+
* @param skipped number of bytes skipped by reading from the stream.
41+
* If the seek was implemented by a close + reopen, set this to zero.
42+
*/
43+
void seekForwards(long skipped);
44+
45+
/**
46+
* Record a forward or backward seek, adding a seek operation, a forward or
47+
* a backward seek operation, and number of bytes skipped.
48+
*
49+
* @param seekTo seek to the position.
50+
* @param currentPos current position.
51+
*/
52+
void seek(long seekTo, long currentPos);
53+
54+
/**
55+
* Increment the bytes read counter by the number of bytes;
56+
* no-op if the argument is negative.
57+
*
58+
* @param bytes number of bytes read.
59+
*/
60+
void bytesRead(long bytes);
61+
62+
/**
63+
* Record the total bytes read from buffer.
64+
*
65+
* @param bytes number of bytes that are read from buffer.
66+
*/
67+
void bytesReadFromBuffer(long bytes);
68+
69+
/**
70+
* Records the total number of seeks done in the buffer.
71+
*/
72+
void seekInBuffer();
73+
74+
/**
75+
* A {@code read(byte[] buf, int off, int len)} operation has started.
76+
*
77+
* @param pos starting position of the read.
78+
* @param len length of bytes to read.
79+
*/
80+
void readOperationStarted(long pos, long len);
81+
82+
/**
83+
* Records a successful remote read operation.
84+
*/
85+
void remoteReadOperation();
86+
87+
/**
88+
* Makes the string of all the AbfsInputStream statistics.
89+
* @return the string with all the statistics.
90+
*/
91+
@Override
92+
String toString();
93+
}

0 commit comments

Comments
 (0)