Skip to content

Commit 459eb2a

Browse files
authored
HADOOP-16914 Adding Output Stream Counters in ABFS (#1899)
Contributed by Mehakmeet Singh.There
1 parent 5958af4 commit 459eb2a

File tree

8 files changed

+754
-0
lines changed

8 files changed

+754
-0
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
8585
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
8686
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
87+
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
8788
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
8889
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
8990
import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -426,6 +427,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
426427
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
427428
.enableFlush(abfsConfiguration.isFlushEnabled())
428429
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
430+
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
429431
.build();
430432
}
431433

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
import com.google.common.annotations.VisibleForTesting;
3737
import com.google.common.base.Preconditions;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840

3941
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
4042
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -50,6 +52,7 @@
5052
* The BlobFsOutputStream for Rest AbfsClient.
5153
*/
5254
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities {
55+
5356
private final AbfsClient client;
5457
private final String path;
5558
private long position;
@@ -80,6 +83,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
8083
= new ElasticByteBufferPool();
8184

8285
private final Statistics statistics;
86+
private final AbfsOutputStreamStatistics outputStreamStatistics;
87+
88+
private static final Logger LOG =
89+
LoggerFactory.getLogger(AbfsOutputStream.class);
8390

8491
public AbfsOutputStream(
8592
final AbfsClient client,
@@ -101,6 +108,7 @@ public AbfsOutputStream(
101108
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
102109
this.bufferIndex = 0;
103110
this.writeOperations = new ConcurrentLinkedDeque<>();
111+
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
104112

105113
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
106114

@@ -278,6 +286,9 @@ public synchronized void close() throws IOException {
278286
threadExecutor.shutdownNow();
279287
}
280288
}
289+
if (LOG.isDebugEnabled()) {
290+
LOG.debug("Closing AbfsOutputStream ", toString());
291+
}
281292
}
282293

283294
private synchronized void flushInternal(boolean isClose) throws IOException {
@@ -296,16 +307,20 @@ private synchronized void writeCurrentBufferToService() throws IOException {
296307
if (bufferIndex == 0) {
297308
return;
298309
}
310+
outputStreamStatistics.writeCurrentBuffer();
299311

300312
final byte[] bytes = buffer;
301313
final int bytesLength = bufferIndex;
314+
outputStreamStatistics.bytesToUpload(bytesLength);
302315
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
303316
bufferIndex = 0;
304317
final long offset = position;
305318
position += bytesLength;
306319

307320
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
321+
long start = System.currentTimeMillis();
308322
waitForTaskToComplete();
323+
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
309324
}
310325

311326
final Future<Void> job = completionService.submit(new Callable<Void>() {
@@ -324,6 +339,11 @@ public Void call() throws Exception {
324339
}
325340
});
326341

342+
if (job.isCancelled()) {
343+
outputStreamStatistics.uploadFailed(bytesLength);
344+
} else {
345+
outputStreamStatistics.uploadSuccessful(bytesLength);
346+
}
327347
writeOperations.add(new WriteOperation(job, offset, bytesLength));
328348

329349
// Try to shrink the queue
@@ -388,6 +408,8 @@ private synchronized void shrinkWriteOperationQueue() throws IOException {
388408
writeOperations.peek().task.get();
389409
lastTotalAppendOffset += writeOperations.peek().length;
390410
writeOperations.remove();
411+
// Incrementing statistics to indicate queue has been shrunk.
412+
outputStreamStatistics.queueShrunk();
391413
}
392414
} catch (Exception e) {
393415
if (e.getCause() instanceof AzureBlobFileSystemException) {
@@ -435,4 +457,38 @@ private static class WriteOperation {
435457
public synchronized void waitForPendingUploads() throws IOException {
436458
waitForTaskToComplete();
437459
}
460+
461+
/**
462+
* Getter method for AbfsOutputStream statistics.
463+
*
464+
* @return statistics for AbfsOutputStream.
465+
*/
466+
@VisibleForTesting
467+
public AbfsOutputStreamStatistics getOutputStreamStatistics() {
468+
return outputStreamStatistics;
469+
}
470+
471+
/**
472+
* Getter to get the size of the task queue.
473+
*
474+
* @return the number of writeOperations in AbfsOutputStream.
475+
*/
476+
@VisibleForTesting
477+
public int getWriteOperationsSize() {
478+
return writeOperations.size();
479+
}
480+
481+
/**
482+
* Appending AbfsOutputStream statistics to base toString().
483+
*
484+
* @return String with AbfsOutputStream statistics.
485+
*/
486+
@Override
487+
public String toString() {
488+
final StringBuilder sb = new StringBuilder(super.toString());
489+
sb.append("AbfsOuputStream@").append(this.hashCode()).append("){");
490+
sb.append(outputStreamStatistics.toString());
491+
sb.append("}");
492+
return sb.toString();
493+
}
438494
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
2929

3030
private boolean disableOutputStreamFlush;
3131

32+
private AbfsOutputStreamStatistics streamStatistics;
33+
3234
public AbfsOutputStreamContext() {
3335
}
3436

@@ -49,6 +51,12 @@ public AbfsOutputStreamContext disableOutputStreamFlush(
4951
return this;
5052
}
5153

54+
public AbfsOutputStreamContext withStreamStatistics(
55+
final AbfsOutputStreamStatistics streamStatistics) {
56+
this.streamStatistics = streamStatistics;
57+
return this;
58+
}
59+
5260
public AbfsOutputStreamContext build() {
5361
// Validation of parameters to be done here.
5462
return this;
@@ -65,4 +73,8 @@ public boolean isEnableFlush() {
6573
public boolean isDisableOutputStreamFlush() {
6674
return disableOutputStreamFlush;
6775
}
76+
77+
public AbfsOutputStreamStatistics getStreamStatistics() {
78+
return streamStatistics;
79+
}
6880
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import org.apache.hadoop.classification.InterfaceStability;
22+
23+
/**
24+
* Interface for {@link AbfsOutputStream} statistics.
25+
*/
26+
@InterfaceStability.Unstable
27+
public interface AbfsOutputStreamStatistics {
28+
29+
/**
30+
* Number of bytes to be uploaded.
31+
*
32+
* @param bytes number of bytes to upload.
33+
*/
34+
void bytesToUpload(long bytes);
35+
36+
/**
37+
* Records a successful upload and the number of bytes uploaded.
38+
*
39+
* @param bytes number of bytes that were successfully uploaded.
40+
*/
41+
void uploadSuccessful(long bytes);
42+
43+
/**
44+
* Records that upload is failed and the number of bytes.
45+
*
46+
* @param bytes number of bytes that failed to upload.
47+
*/
48+
void uploadFailed(long bytes);
49+
50+
/**
51+
* Time spent in waiting for tasks to be completed in the blocking queue.
52+
*
53+
* @param start millisecond at which the wait for task to be complete begins.
54+
* @param end millisecond at which the wait is completed for the task.
55+
*/
56+
void timeSpentTaskWait(long start, long end);
57+
58+
/**
59+
* Number of times task queue is shrunk.
60+
*/
61+
void queueShrunk();
62+
63+
/**
64+
* Number of times buffer is written to the service after a write operation.
65+
*/
66+
void writeCurrentBuffer();
67+
68+
/**
69+
* Method to form a string of all AbfsOutputStream statistics and their
70+
* values.
71+
*
72+
* @return AbfsOutputStream statistics.
73+
*/
74+
@Override
75+
String toString();
76+
77+
}

0 commit comments

Comments
 (0)