From 3ed1ed1feffc3f0410f372089c89f588d7786d38 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 14 Apr 2020 15:15:32 +0530 Subject: [PATCH 1/3] HADOOP-16965 Rafactor abfs stream configuration. --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 26 +++++++--- .../fs/azurebfs/services/AbfsInputStream.java | 27 ++++++---- .../AbfsInputStreamConfiguration.java | 51 +++++++++++++++++++ .../azurebfs/services/AbfsOutputStream.java | 19 ++++--- .../AbfsOutputStreamConfiguration.java | 51 +++++++++++++++++++ .../services/AbfsStreamConfiguration.java | 26 ++++++++++ 6 files changed, 171 insertions(+), 29 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index a330da40cbf9b..e025b17fbe8b5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -81,7 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamConfiguration; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamConfiguration; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -415,12 +417,16 @@ public OutputStream createFile(final Path path, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamConfiguration()); } } + private AbfsOutputStreamConfiguration populateAbfsOutputStreamConfiguration() { + return new AbfsOutputStreamConfiguration(abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); + } + public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { @@ -466,11 +472,17 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends(), eTag); + populateAbfsInputStreamConfiguration(), + eTag); } } + private AbfsInputStreamConfiguration populateAbfsInputStreamConfiguration() { + return new AbfsInputStreamConfiguration(abfsConfiguration.getReadBufferSize(), + abfsConfiguration.getReadAheadQueueDepth(), + abfsConfiguration.getTolerateOobAppends()); + } + public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { @@ -502,9 +514,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamConfiguration()); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0c06014a40832..7a6fec0969b6b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -61,25 +61,30 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private boolean closed = false; public AbfsInputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long contentLength, - final int bufferSize, - final int readAheadQueueDepth, - final boolean tolerateOobAppends, - final String eTag) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long contentLength, + AbfsInputStreamConfiguration abfsInputStreamConfiguration, + final String eTag) { this.client = client; this.statistics = statistics; this.path = path; this.contentLength = contentLength; - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.tolerateOobAppends = tolerateOobAppends; + this.bufferSize = abfsInputStreamConfiguration.getReadBufferSize(); + this.readAheadQueueDepth = getReadAheadQueueDepth( + abfsInputStreamConfiguration.getReadAheadQueueDepth()); + this.tolerateOobAppends = abfsInputStreamConfiguration.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; } + private int getReadAheadQueueDepth(int readAheadQueueDepthVal) { + return (readAheadQueueDepthVal >= 0) + ? readAheadQueueDepthVal + : Runtime.getRuntime().availableProcessors(); + } + public String getPath() { return path; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java new file mode 100644 index 0000000000000..04075c408c98c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra input stream configs. + */ +public class AbfsInputStreamConfiguration extends AbfsStreamConfiguration { + + private int readBufferSize; + + private int readAheadQueueDepth; + + private boolean tolerateOobAppends; + + public AbfsInputStreamConfiguration(int readBufferSize, + int readAheadQueueDepth, + boolean tolerateOobAppends) { + this.readBufferSize = readBufferSize; + this.readAheadQueueDepth = readAheadQueueDepth; + this.tolerateOobAppends = tolerateOobAppends; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + public int getReadAheadQueueDepth() { + return readAheadQueueDepth; + } + + public boolean isTolerateOobAppends() { + return tolerateOobAppends; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index e943169a03e4f..85588ec878d67 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final Statistics statistics; public AbfsOutputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long position, - final int bufferSize, - final boolean supportFlush, - final boolean disableOutputStreamFlush) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long position, + AbfsOutputStreamConfiguration abfsOutputStreamConfiguration) { this.client = client; this.statistics = statistics; this.path = path; this.position = position; this.closed = false; - this.supportFlush = supportFlush; - this.disableOutputStreamFlush = disableOutputStreamFlush; + this.supportFlush = abfsOutputStreamConfiguration.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamConfiguration + .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = bufferSize; + this.bufferSize = abfsOutputStreamConfiguration.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java new file mode 100644 index 0000000000000..293d5bfe63e7d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra output stream configs. + */ +public class AbfsOutputStreamConfiguration extends AbfsStreamConfiguration { + + private int writeBufferSize; + + private boolean enableFlush; + + private boolean disableOutputStreamFlush; + + public AbfsOutputStreamConfiguration(int writeBufferSize, + boolean enableFlush, + boolean disableOutputStreamFlush) { + this.writeBufferSize = writeBufferSize; + this.enableFlush = enableFlush; + this.disableOutputStreamFlush = disableOutputStreamFlush; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } + + public boolean isEnableFlush() { + return enableFlush; + } + + public boolean isDisableOutputStreamFlush() { + return disableOutputStreamFlush; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java new file mode 100644 index 0000000000000..41ae0ff8664ac --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Base stream configuration class which is going + * to store common configs among input and output streams. + */ +public abstract class AbfsStreamConfiguration { +} From 492ca11519a9122fac2d7c33436aa008a86f980e Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 15 Apr 2020 19:01:45 +0530 Subject: [PATCH 2/3] Changing to builder based contexts --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 30 +++++++++++-------- .../fs/azurebfs/services/AbfsInputStream.java | 15 +++------- ...ation.java => AbfsInputStreamContext.java} | 29 ++++++++++++++---- .../azurebfs/services/AbfsOutputStream.java | 8 ++--- ...tion.java => AbfsOutputStreamContext.java} | 25 +++++++++++++--- ...figuration.java => AbfsStreamContext.java} | 2 +- 6 files changed, 71 insertions(+), 38 deletions(-) rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/{AbfsInputStreamConfiguration.java => AbfsInputStreamContext.java} (65%) rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/{AbfsOutputStreamConfiguration.java => AbfsOutputStreamContext.java} (71%) rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/{AbfsStreamConfiguration.java => AbfsStreamContext.java} (95%) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e025b17fbe8b5..6b194a41de2ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -81,9 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; -import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamConfiguration; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamConfiguration; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -417,14 +417,16 @@ public OutputStream createFile(final Path path, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - populateAbfsOutputStreamConfiguration()); + populateAbfsOutputStreamContext()); } } - private AbfsOutputStreamConfiguration populateAbfsOutputStreamConfiguration() { - return new AbfsOutputStreamConfiguration(abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + private AbfsOutputStreamContext populateAbfsOutputStreamContext() { + return new AbfsOutputStreamContext() + .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) + .enableFlush(abfsConfiguration.isFlushEnabled()) + .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .build(); } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) @@ -472,15 +474,17 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - populateAbfsInputStreamConfiguration(), + populateAbfsInputStreamContext(), eTag); } } - private AbfsInputStreamConfiguration populateAbfsInputStreamConfiguration() { - return new AbfsInputStreamConfiguration(abfsConfiguration.getReadBufferSize(), - abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends()); + private AbfsInputStreamContext populateAbfsInputStreamContext() { + return new AbfsInputStreamContext() + .withReadBufferSize(abfsConfiguration.getReadBufferSize()) + .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) + .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .build(); } public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws @@ -514,7 +518,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, - populateAbfsOutputStreamConfiguration()); + populateAbfsOutputStreamContext()); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 7a6fec0969b6b..3232645d360c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -65,26 +65,19 @@ public AbfsInputStream( final Statistics statistics, final String path, final long contentLength, - AbfsInputStreamConfiguration abfsInputStreamConfiguration, + AbfsInputStreamContext abfsInputStreamContext, final String eTag) { this.client = client; this.statistics = statistics; this.path = path; this.contentLength = contentLength; - this.bufferSize = abfsInputStreamConfiguration.getReadBufferSize(); - this.readAheadQueueDepth = getReadAheadQueueDepth( - abfsInputStreamConfiguration.getReadAheadQueueDepth()); - this.tolerateOobAppends = abfsInputStreamConfiguration.isTolerateOobAppends(); + this.bufferSize = abfsInputStreamContext.getReadBufferSize(); + this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); + this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; } - private int getReadAheadQueueDepth(int readAheadQueueDepthVal) { - return (readAheadQueueDepthVal >= 0) - ? readAheadQueueDepthVal - : Runtime.getRuntime().availableProcessors(); - } - public String getPath() { return path; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java similarity index 65% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index 04075c408c98c..cd9c750e70d62 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -21,7 +21,7 @@ /** * Class to hold extra input stream configs. */ -public class AbfsInputStreamConfiguration extends AbfsStreamConfiguration { +public class AbfsInputStreamContext extends AbfsStreamContext { private int readBufferSize; @@ -29,12 +29,31 @@ public class AbfsInputStreamConfiguration extends AbfsStreamConfiguration { private boolean tolerateOobAppends; - public AbfsInputStreamConfiguration(int readBufferSize, - int readAheadQueueDepth, - boolean tolerateOobAppends) { + public AbfsInputStreamContext() { + } + + public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { this.readBufferSize = readBufferSize; - this.readAheadQueueDepth = readAheadQueueDepth; + return this; + } + + public AbfsInputStreamContext withReadAheadQueueDepth( + final int readAheadQueueDepth) { + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) + ? readAheadQueueDepth + : Runtime.getRuntime().availableProcessors();; + return this; + } + + public AbfsInputStreamContext withTolerateOobAppends( + final boolean tolerateOobAppends) { this.tolerateOobAppends = tolerateOobAppends; + return this; + } + + public AbfsInputStreamContext build() { + // Validation of parameters to be done here. + return this; } public int getReadBufferSize() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 85588ec878d67..f29cc4afe820a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -86,18 +86,18 @@ public AbfsOutputStream( final Statistics statistics, final String path, final long position, - AbfsOutputStreamConfiguration abfsOutputStreamConfiguration) { + AbfsOutputStreamContext abfsOutputStreamContext) { this.client = client; this.statistics = statistics; this.path = path; this.position = position; this.closed = false; - this.supportFlush = abfsOutputStreamConfiguration.isEnableFlush(); - this.disableOutputStreamFlush = abfsOutputStreamConfiguration + this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamContext .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = abfsOutputStreamConfiguration.getWriteBufferSize(); + this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java similarity index 71% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 293d5bfe63e7d..0be97c5d9f1f2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -21,7 +21,7 @@ /** * Class to hold extra output stream configs. */ -public class AbfsOutputStreamConfiguration extends AbfsStreamConfiguration { +public class AbfsOutputStreamContext extends AbfsStreamContext { private int writeBufferSize; @@ -29,12 +29,29 @@ public class AbfsOutputStreamConfiguration extends AbfsStreamConfiguration { private boolean disableOutputStreamFlush; - public AbfsOutputStreamConfiguration(int writeBufferSize, - boolean enableFlush, - boolean disableOutputStreamFlush) { + public AbfsOutputStreamContext() { + } + + public AbfsOutputStreamContext withWriteBufferSize( + final int writeBufferSize) { this.writeBufferSize = writeBufferSize; + return this; + } + + public AbfsOutputStreamContext enableFlush(final boolean enableFlush) { this.enableFlush = enableFlush; + return this; + } + + public AbfsOutputStreamContext disableOutputStreamFlush( + final boolean disableOutputStreamFlush) { this.disableOutputStreamFlush = disableOutputStreamFlush; + return this; + } + + public AbfsOutputStreamContext build() { + // Validation of parameters to be done here. + return this; } public int getWriteBufferSize() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java similarity index 95% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java index 41ae0ff8664ac..ee77f595fed4b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java @@ -22,5 +22,5 @@ * Base stream configuration class which is going * to store common configs among input and output streams. */ -public abstract class AbfsStreamConfiguration { +public abstract class AbfsStreamContext { } From 8faf18c8a78ba1dce0e2001c3362353d0d14eb3c Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 21 Apr 2020 18:58:21 +0530 Subject: [PATCH 3/3] Review comments --- .../org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- .../hadoop/fs/azurebfs/services/AbfsInputStreamContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3232645d360c4..05c093a880d82 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -65,7 +65,7 @@ public AbfsInputStream( final Statistics statistics, final String path, final long contentLength, - AbfsInputStreamContext abfsInputStreamContext, + final AbfsInputStreamContext abfsInputStreamContext, final String eTag) { this.client = client; this.statistics = statistics; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index cd9c750e70d62..cba719101694c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -41,7 +41,7 @@ public AbfsInputStreamContext withReadAheadQueueDepth( final int readAheadQueueDepth) { this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth - : Runtime.getRuntime().availableProcessors();; + : Runtime.getRuntime().availableProcessors(); return this; }