Skip to content

Commit 98fdbb8

Browse files
mukund-thakursteveloughran
authored andcommitted
HADOOP-16965. Refactor abfs stream configuration. (#1956)
Contributed by Mukund Thakur.
1 parent f74a571 commit 98fdbb8

File tree

6 files changed

+204
-29
lines changed

6 files changed

+204
-29
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@
8181
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
8282
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
8383
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
84+
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
8485
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
86+
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
8587
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
8688
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
8789
import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -415,12 +417,18 @@ public OutputStream createFile(final Path path,
415417
statistics,
416418
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
417419
0,
418-
abfsConfiguration.getWriteBufferSize(),
419-
abfsConfiguration.isFlushEnabled(),
420-
abfsConfiguration.isOutputStreamFlushDisabled());
420+
populateAbfsOutputStreamContext());
421421
}
422422
}
423423

424+
private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
425+
return new AbfsOutputStreamContext()
426+
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
427+
.enableFlush(abfsConfiguration.isFlushEnabled())
428+
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
429+
.build();
430+
}
431+
424432
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
425433
throws AzureBlobFileSystemException {
426434
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@@ -466,11 +474,19 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
466474
// Add statistics for InputStream
467475
return new AbfsInputStream(client, statistics,
468476
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
469-
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
470-
abfsConfiguration.getTolerateOobAppends(), eTag);
477+
populateAbfsInputStreamContext(),
478+
eTag);
471479
}
472480
}
473481

482+
private AbfsInputStreamContext populateAbfsInputStreamContext() {
483+
return new AbfsInputStreamContext()
484+
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
485+
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
486+
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
487+
.build();
488+
}
489+
474490
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
475491
AzureBlobFileSystemException {
476492
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@@ -502,9 +518,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
502518
statistics,
503519
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
504520
offset,
505-
abfsConfiguration.getWriteBufferSize(),
506-
abfsConfiguration.isFlushEnabled(),
507-
abfsConfiguration.isOutputStreamFlushDisabled());
521+
populateAbfsOutputStreamContext());
508522
}
509523
}
510524

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
6161
private boolean closed = false;
6262

6363
public AbfsInputStream(
64-
final AbfsClient client,
65-
final Statistics statistics,
66-
final String path,
67-
final long contentLength,
68-
final int bufferSize,
69-
final int readAheadQueueDepth,
70-
final boolean tolerateOobAppends,
71-
final String eTag) {
64+
final AbfsClient client,
65+
final Statistics statistics,
66+
final String path,
67+
final long contentLength,
68+
final AbfsInputStreamContext abfsInputStreamContext,
69+
final String eTag) {
7270
this.client = client;
7371
this.statistics = statistics;
7472
this.path = path;
7573
this.contentLength = contentLength;
76-
this.bufferSize = bufferSize;
77-
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
78-
this.tolerateOobAppends = tolerateOobAppends;
74+
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
75+
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
76+
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
7977
this.eTag = eTag;
8078
this.readAheadEnabled = true;
8179
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Class to hold extra input stream configs.
23+
*/
24+
public class AbfsInputStreamContext extends AbfsStreamContext {
25+
26+
private int readBufferSize;
27+
28+
private int readAheadQueueDepth;
29+
30+
private boolean tolerateOobAppends;
31+
32+
public AbfsInputStreamContext() {
33+
}
34+
35+
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
36+
this.readBufferSize = readBufferSize;
37+
return this;
38+
}
39+
40+
public AbfsInputStreamContext withReadAheadQueueDepth(
41+
final int readAheadQueueDepth) {
42+
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
43+
? readAheadQueueDepth
44+
: Runtime.getRuntime().availableProcessors();
45+
return this;
46+
}
47+
48+
public AbfsInputStreamContext withTolerateOobAppends(
49+
final boolean tolerateOobAppends) {
50+
this.tolerateOobAppends = tolerateOobAppends;
51+
return this;
52+
}
53+
54+
public AbfsInputStreamContext build() {
55+
// Validation of parameters to be done here.
56+
return this;
57+
}
58+
59+
public int getReadBufferSize() {
60+
return readBufferSize;
61+
}
62+
63+
public int getReadAheadQueueDepth() {
64+
return readAheadQueueDepth;
65+
}
66+
67+
public boolean isTolerateOobAppends() {
68+
return tolerateOobAppends;
69+
}
70+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
8282
private final Statistics statistics;
8383

8484
public AbfsOutputStream(
85-
final AbfsClient client,
86-
final Statistics statistics,
87-
final String path,
88-
final long position,
89-
final int bufferSize,
90-
final boolean supportFlush,
91-
final boolean disableOutputStreamFlush) {
85+
final AbfsClient client,
86+
final Statistics statistics,
87+
final String path,
88+
final long position,
89+
AbfsOutputStreamContext abfsOutputStreamContext) {
9290
this.client = client;
9391
this.statistics = statistics;
9492
this.path = path;
9593
this.position = position;
9694
this.closed = false;
97-
this.supportFlush = supportFlush;
98-
this.disableOutputStreamFlush = disableOutputStreamFlush;
95+
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
96+
this.disableOutputStreamFlush = abfsOutputStreamContext
97+
.isDisableOutputStreamFlush();
9998
this.lastError = null;
10099
this.lastFlushOffset = 0;
101-
this.bufferSize = bufferSize;
100+
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
102101
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
103102
this.bufferIndex = 0;
104103
this.writeOperations = new ConcurrentLinkedDeque<>();
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Class to hold extra output stream configs.
23+
*/
24+
public class AbfsOutputStreamContext extends AbfsStreamContext {
25+
26+
private int writeBufferSize;
27+
28+
private boolean enableFlush;
29+
30+
private boolean disableOutputStreamFlush;
31+
32+
public AbfsOutputStreamContext() {
33+
}
34+
35+
public AbfsOutputStreamContext withWriteBufferSize(
36+
final int writeBufferSize) {
37+
this.writeBufferSize = writeBufferSize;
38+
return this;
39+
}
40+
41+
public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
42+
this.enableFlush = enableFlush;
43+
return this;
44+
}
45+
46+
public AbfsOutputStreamContext disableOutputStreamFlush(
47+
final boolean disableOutputStreamFlush) {
48+
this.disableOutputStreamFlush = disableOutputStreamFlush;
49+
return this;
50+
}
51+
52+
public AbfsOutputStreamContext build() {
53+
// Validation of parameters to be done here.
54+
return this;
55+
}
56+
57+
public int getWriteBufferSize() {
58+
return writeBufferSize;
59+
}
60+
61+
public boolean isEnableFlush() {
62+
return enableFlush;
63+
}
64+
65+
public boolean isDisableOutputStreamFlush() {
66+
return disableOutputStreamFlush;
67+
}
68+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
/**
22+
* Base stream configuration class which is going
23+
* to store common configs among input and output streams.
24+
*/
25+
public abstract class AbfsStreamContext {
26+
}

0 commit comments

Comments
 (0)