diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
index 8d11043937612..555ca0816cdff 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import com.google.common.base.Preconditions;
@@ -308,9 +309,6 @@ private void freeBuffers() {
@Override
public boolean hasCapability(String capability) {
- if (out instanceof StreamCapabilities) {
- return ((StreamCapabilities) out).hasCapability(capability);
- }
- return false;
+ return StoreImplementationUtils.hasCapability(out, capability);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java
index 2e2d98b9c5462..0077838920a9e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java
@@ -36,6 +36,6 @@ public interface CanSetDropBehind {
* UnsupportedOperationException If this stream doesn't support
* setting the drop-behind.
*/
- public void setDropBehind(Boolean dropCache)
+ void setDropBehind(Boolean dropCache)
throws IOException, UnsupportedOperationException;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 31f82975899e1..861070158b029 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;
@@ -234,10 +235,7 @@ public void unbuffer() {
@Override
public boolean hasCapability(String capability) {
- if (in instanceof StreamCapabilities) {
- return ((StreamCapabilities) in).hasCapability(capability);
- }
- return false;
+ return StoreImplementationUtils.hasCapability(in, capability);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
index 5b604e58e2360..08dd3262da8db 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@@ -122,10 +123,7 @@ public OutputStream getWrappedStream() {
@Override
public boolean hasCapability(String capability) {
- if (wrappedStream instanceof StreamCapabilities) {
- return ((StreamCapabilities) wrappedStream).hasCapability(capability);
- }
- return false;
+ return StoreImplementationUtils.hasCapability(wrappedStream, capability);
}
@Override // Syncable
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 2458b2f40d8d7..aaa19adf8c6a4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -33,7 +33,8 @@
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
-abstract public class FSOutputSummer extends OutputStream {
+abstract public class FSOutputSummer extends OutputStream implements
+ StreamCapabilities {
// data checksum
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
@@ -254,4 +255,9 @@ protected synchronized void setChecksumBufSize(int size) {
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
+
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java
index 7ec3509ce1df6..1f1295e4e7deb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java
@@ -27,16 +27,16 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface Syncable {
-
+
/** Flush out the data in client's user buffer. After the return of
* this call, new readers will see the data.
* @throws IOException if any error occurs
*/
- public void hflush() throws IOException;
+ void hflush() throws IOException;
/** Similar to posix fsync, flush out the data in client's user buffer
* all the way to the disk device (but the disk may have it in its cache).
* @throws IOException if error occurs
*/
- public void hsync() throws IOException;
+ void hsync() throws IOException;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java
new file mode 100644
index 0000000000000..655c4e35298f7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StreamCapabilities;
+
+import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH;
+import static org.apache.hadoop.fs.StreamCapabilities.HSYNC;
+
+/**
+ * Utility classes to help implementing filesystems and streams.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class StoreImplementationUtils {
+
+ private StoreImplementationUtils() {
+ }
+
+ /**
+ * Check the supplied capabilities for being those required for full
+ * {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality.
+ * @param capability capability string.
+ * @return true if either refers to one of the Syncable operations.
+ */
+ public static boolean supportsSyncable(String capability) {
+ return capability.equalsIgnoreCase(HSYNC) ||
+ capability.equalsIgnoreCase((HFLUSH));
+ }
+
+ /**
+ * Probe for an object having a capability; returns true
+ * iff the stream implements {@link StreamCapabilities} and its
+ * {@code hasCapabilities()} method returns true for the capability.
+ * This is a package private method intended to provided a common
+ * implementation for input, output streams; the stronger typed
+ * {@link #hasCapability()} call is for public use.
+ * @param object object to probe.
+ * @param capability capability to probe for
+ * @return true iff the object implements stream capabilities and
+ * declares that it supports the capability.
+ */
+ static boolean objectHasCapability(Object object, String capability) {
+ if (object instanceof StreamCapabilities) {
+ return ((StreamCapabilities) object).hasCapability(capability);
+ }
+ return false;
+ }
+
+ /**
+ * Probe for an output stream having a capability; returns true
+ * iff the stream implements {@link StreamCapabilities} and its
+ * {@code hasCapabilities()} method returns true for the capability.
+ * @param out output stream
+ * @param capability capability to probe for
+ * @return true iff the stream declares that it supports the capability.
+ */
+ public static boolean hasCapability(OutputStream out, String capability) {
+ return objectHasCapability(out, capability);
+ }
+
+ /**
+ * Probe for an input stream having a capability; returns true
+ * iff the stream implements {@link StreamCapabilities} and its
+ * {@code hasCapabilities()} method returns true for the capability.
+ * @param out output stream
+ * @param capability capability to probe for
+ * @return true iff the stream declares that it supports the capability.
+ */
+ public static boolean hasCapability(InputStream out, String capability) {
+ return objectHasCapability(out, capability);
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StreamStateModel.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StreamStateModel.java
new file mode 100644
index 0000000000000..e20e07537faee
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StreamStateModel.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+
+import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;
+
+/**
+ * Models a stream's state and can be used for checking this state before
+ * any operation, and for using a lock to manage exclusive access to operations
+ * within the stream.
+ *
+ * The model designed to ensure that a stream knows when it is closed,
+ * and, once it has entered a failed state, that the failure
+ * is not forgotten.
+ *
+ * This state model is based on the design of
+ * {@code org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream} and
+ * {@code org.apache.hadoop.fs.azure.PageBlobOutputStream} which both
+ * store and rethrow any previously raised error.
+ *
+ *
+ * The model has three states: Open, Error, and Closed:
+ *
+ *
+ * {@link State#Open}: caller can interact with the stream.
+ * {@link State#Error}: all operations will raise the previously recorded exception.
+ * {@link State#Closed}: operations will be rejected.
+ *
+
+ * When an instance of the model is created, it is in {@link State#Open};
+ * a call to {@link #enterClosedState()} will move it from Open to Closed;
+ *
+ *
+ *
+ * The lock/unlock operation relays to {@code java.util.concurrent.locks.Lock}
+ * and has a similar usage pattern. A key difference is that a check for the
+ * stream being open can be integrated with the lock acquisition.
+ * If {@link #acquireLock(boolean)} is called with checkOpen == true, then
+ * the caller will know that after the lock is granted then the stream
+ * is open.
+ *
+ * model.acquireLock(true);
+ * try {
+ * (do some work)
+ * } catch (IOException e) {
+ * model.enterErrorState(e);
+ * } finally {
+ * model.releaseLock();
+ * }
+ *
+ *
+ */
+@InterfaceAudience.LimitedPrivate("Filesystems")
+@InterfaceStability.Unstable
+public class StreamStateModel {
+
+ /**
+ * States of the stream.
+ */
+ public enum State {
+
+ /**
+ * Stream is open.
+ */
+ Open,
+
+ /**
+ * Stream is in an error state.
+ * It is not expected to recover from this.
+ */
+ Error,
+
+ /**
+ * Stream is now closed. Operations will fail.
+ */
+ Closed
+ }
+
+ /**
+ * Path; if not empty then a {@link PathIOException} will be raised
+ * containing this path.
+ */
+ private final String path;
+
+ /** Lock. Not considering an InstrumentedWriteLock, but it is an option. */
+ private final Lock lock = new ReentrantLock();
+
+ /**
+ * Initial state: open.
+ */
+ private final AtomicReference state =
+ new AtomicReference<>(State.Open);
+
+ /** Any exception to raise on the next checkOpen call. */
+ private IOException exception;
+
+ /**
+ * Create for a path.
+ * @param path optional path for exception messages.
+ */
+ public StreamStateModel(@Nullable Path path) {
+ this.path = path != null ? path.toString() : "";
+ }
+
+ /**
+ * Create for a path.
+ * @param path optional path for exception messages.
+ */
+ public StreamStateModel(@Nullable final String path) {
+ this.path = path;
+ }
+
+ /**
+ * Get the current state.
+ * Not synchronized; lock if you want consistency across calls.
+ * @return the current state.
+ */
+ public State getState() {
+ return state.get();
+ }
+
+ /**
+ * Change state to closed. No-op if the state was in closed or error.
+ * @return true if the state changed.
+ */
+ public synchronized boolean enterClosedState() {
+ return state.compareAndSet(State.Open, State.Closed);
+ }
+
+ /**
+ * Change state to error and stores first error so it can be re-thrown.
+ * If already in {@link State#Error}: return the previous exception.
+ * @param ex the exception to record
+ * @return the exception set when the error state was entered.
+ */
+ public synchronized IOException enterErrorState(final IOException ex) {
+ Preconditions.checkArgument(ex != null, "Null exception");
+ switch (state.get()) {
+ // a stream can go into the error state when open or closed
+ case Open:
+ case Closed:
+ exception = ex;
+ state.set(State.Error);
+ break;
+ case Error:
+ // already in this state; retain the previous exception.
+ break;
+ }
+ return exception;
+ }
+
+ /**
+ * Verify that a stream is open, throwing an IOException if
+ * not.
+ * If in an error state: rethrow that exception.
+ * If closed, throw an IOException with
+ * {@link FSExceptionMessages#STREAM_IS_CLOSED} in the message.
+ * @throws IOException if the stream is not in {@link State#Open}.
+ * @throws PathIOException if the stream was not open and a path was given
+ * in the constructor.
+ */
+ public synchronized void checkOpen() throws IOException {
+ switch (state.get()) {
+ case Open:
+ return;
+
+ case Error:
+ throw exception;
+
+ case Closed:
+ if (StringUtils.isNotEmpty(path)) {
+ throw new PathIOException(path, STREAM_IS_CLOSED);
+ } else {
+ throw new IOException(STREAM_IS_CLOSED);
+ }
+ }
+ }
+
+ /**
+ * Probe for the model being in a specific state.
+ * @param probe state to probe for
+ * @return true if at the time of the check, the service
+ * was in the given state.
+ */
+ public boolean isInState(State probe) {
+ return state.get().equals(probe);
+ }
+
+ /**
+ * If the model is in {@link State#Error} throw the exception.
+ * @throws IOException if one was caught earlier.
+ */
+ public synchronized void throwAnyExceptionRaised() throws IOException {
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /**
+ * Get any exception. Non-null iff the model is in the error state.
+ * @return any exception set on a transition to the error state.
+ */
+ public synchronized IOException getException() {
+ return exception;
+ }
+
+ /**
+ * Acquire an exclusive lock.
+ * @param checkOpen must the stream be open?
+ * @throws IOException if the stream is in error state or
+ * {@code checkOpen == true} and the stream is closed.
+ */
+ public void acquireLock(boolean checkOpen) throws IOException {
+ // fail fast if the stream is required to be open and it is not
+ if (checkOpen) {
+ checkOpen();
+ }
+
+ // acquire the lock; this may suspend the thread
+ lock.lock();
+
+ // now verify that the stream is still open.
+ if (checkOpen) {
+ checkOpen();
+ }
+ }
+
+ /**
+ * Release the lock.
+ */
+ public void releaseLock() {
+ lock.unlock();
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index a2458ee891448..50f227813435b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -594,6 +594,8 @@ and MAY be a `RuntimeException` or subclass. For instance, HDFS may raise a `Inv
result = FSDataOutputStream
+A zero byte file must exist at the end of the specified path, visible to all
+
The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`.
The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of
@@ -607,10 +609,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne
clients creating files with `overwrite==true` to fail if the file is created
by another client between the two tests.
-* S3A, Swift and potentially other Object Stores do not currently change the FS state
+* S3A, Swift and potentially other Object Stores do not currently change the `FS` state
until the output stream `close()` operation is completed.
-This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`,
- and potentially confuse file/directory logic
+This is a significant difference between the behavior of object stores
+and that of filesystems, as it allows >1 client to create a file with `overwrite==false`,
+and potentially confuse file/directory logic. In particular, using `create()` to acquire
+an exclusive lock on a file (whoever creates the file without an error is considered
+the holder of the lock) may not not a safe algorithm to use when working with object stores.
+
+* Object stores may create an empty file as a marker when a file is created.
+However, object stores with overwrite=true semantics may not implement this atomically,
+so creating files with `overwrite==false` cannot be used as an implicit exclusion
+mechanism between processes.
* The Local FileSystem raises a `FileNotFoundException` when trying to create a file over
a directory, hence it is listed as an exception that MAY be raised when
@@ -647,7 +657,7 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep
#### Postconditions
- FS
+ FS' = FS
result = FSDataOutputStream
Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]`
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index df538ee6cf96b..0cc3423b08982 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [Notation](notation.html)
1. [Model](model.html)
1. [FileSystem class](filesystem.html)
+1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
1. [FSDataInputStream class](fsdatainputstream.html)
1. [PathCapabilities interface](pathcapabilities.html)
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md
new file mode 100644
index 0000000000000..d44baef8bb151
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md
@@ -0,0 +1,850 @@
+
+
+
+
+# Output: `OutputStream`, `Syncable` and `StreamCapabilities`
+
+## Introduction
+
+This document covers the Output Streams within the context of the
+[Hadoop File System Specification](index.html).
+
+It uses the filesystem model defined in [A Model of a Hadoop Filesystem](model.html)
+with the notation defined in [notation](Notation.md).
+
+The target audiences are
+1. Users of the APIs. While `java.io.OutputStream` is a standard interfaces,
+this document clarifies how it is implemented in HDFS and elsewhere.
+The Hadoop-specific interfaces `Syncable` and `StreamCapabilities` are new;
+`Syncable` is notable in offering durability and visibility guarantees which
+exceed that of `OutputStream`.
+1. Implementors of File Systems and clients.
+
+## How data is written to a filesystem
+
+The core mechanism to write data to files through the Hadoop FileSystem APIs
+is through `OutputStream` subclasses obtained through calls to
+`FileSystem.create()`, `FileSystem.append()`,
+or `FSDataOutputStreamBuilder.build()`.
+
+These all return instances of `FSDataOutputStream`, through which data
+can be written through various `write()` methods.
+After a stream's `close()` method is called, all data written to the
+stream MUST BE persisted to the fileysystem and visible to oll other
+clients attempting to read data from that path via `FileSystem.open()`.
+
+
+As well as operations to write the data, Hadoop's OutputStream implementations
+provide methods to flush buffered data back to the filesystem,
+so as to ensure that the data is reliably persisted and/or visible
+to other callers. This is done via the `Syncable` interface. It was
+originally intended that the presence of this interface could be interpreted
+as a guarantee that the stream supported its methods. However, this has proven
+impossible to guarantee as the static nature of the interface is incompatible
+with filesystems whose syncability semantics may vary on a store/path basis.
+As an example, erasure coded files in HDFS do not support the Sync operations,
+even though they are implemented as subclass of an output stream which is `Syncable`.
+A new interface, `StreamCapabilities` has been implemented to allow callers
+to probe the exact capabilities of a stream, even transitively
+through a chain of streams.
+
+
+
+## Output Stream Model
+
+For this specification, an output stream can be viewed as a list of bytes
+stored in in the client
+
+```python
+buffer: List[byte]
+```
+
+A flag, `open` tracks whether the stream is open: after the stream
+is closed no more data may be written to it:
+
+```python
+open: bool
+buffer: List[byte]
+```
+
+The destination path of the stream, `path`, can be tracked to form a triple
+`path, open, buffer`
+
+```python
+Stream = (path: Path, open: Boolean, buffer: byte[])
+```
+
+#### Visibility of Flushed Data
+
+(Immediately) after `Syncable` operations which flush data to the filesystem,
+the data at the stream's destination path MUST match that of
+`buffer`. That is, the following condition MUST hold:
+
+```python
+FS'.Files(path) == buffer
+```
+
+Any client reading the data at the path will see the new data.
+The two sync operations, `hflush()` and `hsync()` differ in their durability
+guarantees, not visibility of data.
+
+### State of Stream and Filesystem after `Filesystem.create()`
+
+The output stream returned by a `FileSystem.create(path)` or
+`FileSystem.createFile(path).build`
+can be modeled as a triple containing an empty array of no data:
+
+```python
+Stream' = (path, true, [])
+```
+
+The filesystem `FS'` MUST contain a 0-byte file at the path:
+
+```python
+data(FS', path) == []
+```
+
+Thus, the initial state of `Stream'.buffer` is implicitly
+consistent with the data at the filesystem.
+
+
+*Object Stores*: see caveats in the "Object Stores" section below.
+
+### State of Stream and Filesystem after `Filesystem.append()`
+
+The output stream returned from a call of
+ `FileSystem.append(path, buffersize, progress)`,
+can be modelled as a stream whose `buffer` is intialized to that of
+the original file:
+
+```python
+Stream' = (path, true, data(FS, path))
+```
+
+#### Persisting data
+
+When the stream writes data back to its store, be it in any
+supported flush operation, in the `close()` operation, or at any other
+time the stream chooses to do so, the contents of the file
+are replaced with the current buffer
+
+```python
+Stream' = (path, true, buffer)
+FS' = FS where data(FS', path) == buffer
+```
+
+After a call to `close()`, the stream is closed for all operations other
+than `close()`; they MAY fail with `IOException` or `RuntimeException`.
+
+```python
+Stream' = (path, false, [])
+```
+
+The `close()` operation must be idempotent with the sole attempt to write the
+data made in the first invocation.
+
+1. If `close()` succeeds, subsequent calls are no-ops.
+1. If `close()` fails, again, subsequent calls are no-ops. They MAY rethrow
+the previous exception, but they MUST NOT retry the write.
+
+
+
+
+
+# Class `FSDataOutputStream`
+
+```java
+public class FSDataOutputStream
+ extends DataOutputStream
+ implements Syncable, CanSetDropBehind, StreamCapabilities {
+ // ...
+}
+```
+
+The `FileSystem.create()`, `FileSystem.append()` and
+`FSDataOutputStreamBuilder.build()` calls return an instance
+of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`.
+
+The base class wraps an `OutputStream` instance, one which may implement `Streamable`,
+`CanSetDropBehind` and `StreamCapabilities`.
+
+This document covers the requirements of such implementations.
+
+HDFS's `FileSystem` implementation, `DistributedFileSystem`, returns an instance
+of `HdfsDataOutputStream`. This implementation has at least two behaviors
+which are not explicitly declared by the base Java implmentation
+
+1. Writes are synchronized: more than one thread can write to the same
+output stream. This is a use pattern which HBase relies on.
+
+1. `OutputStream.flush()` is a no-op when the file is closed. Apache Druid
+has made such a call on this in the past
+[HADOOP-14346](https://issues.apache.org/jira/browse/HADOOP-14346).
+
+
+As the HDFS implementation is considered the de-facto specification of
+the FileSystem APIs, the fact that `write()` is thread-safe is significant.
+
+For compatibility, not only must other FS clients be thread-safe,
+but new HDFS featues, such as encryption and Erasure Coding must also
+implement consistent behavior with the core HDFS output stream.
+
+Put differently:
+
+*It isn't enough for Output Streams to implement the core semantics
+of `java.io.OutputStream`: they need to implement the extra semantics
+of `HdfsDataOutputStream`, especially for HBase to work correctly.
+
+The concurrent `write()` call is the most significant tightening of
+the Java specification.
+
+## Class `java.io.OutputStream`
+
+A Java `OutputStream` allows applications to write a sequence of bytes to a destination.
+In a Hadoop filesystem, that destination is the data under a path in the filesystem.
+
+```java
+public abstract class OutputStream implements Closeable, Flushable {
+ public abstract void write(int b) throws IOException;
+ public void write(byte b[]) throws IOException;
+ public void write(byte b[], int off, int len) throws IOException;
+ public void flush() throws IOException ;
+ public void close() throws IOException;
+}
+```
+### `write(Stream, data)`
+
+Writes a byte of data to the stream.
+
+#### Preconditions
+
+```python
+Stream.open else raise ClosedChannelException, PathIOException, IOException
+```
+
+The exception `java.nio.channels.ClosedChannelExceptionn` is
+raised in the HDFS output streams when trying to write to a closed file.
+Ths exception does not include the destination path; and
+`Exception.getMessage()` is `null`. It is therefore of limited value in stack
+traces. Implementors may wish to raise exceptions with more detail, such
+as a `PathIOException`.
+
+
+#### Postconditions
+
+The buffer has the lower 8 bits of the data argument appended to it.
+
+```python
+Stream'.buffer = Stream.buffer + [data & 0xff]
+```
+
+There may be an explicit limit on the size of cached data, or an implicit
+limit based by the available capacity of the destination filesystem.
+When a limit is reached, `write()` SHOULD fail with an `IOException`.
+
+### `write(Stream, byte[] data, int offset, int len)`
+
+
+#### Preconditions
+
+The preconditions are all defined in `OutputStream.write()`
+
+```python
+Stream.open else raise ClosedChannelException, PathIOException, IOException
+data != null else raise NullPointerException
+offset >= 0 else raise IndexOutOfBoundsException
+len >= 0 else raise IndexOutOfBoundsException
+offset < data.length else raise IndexOutOfBoundsException
+offset + len < data.length else raise IndexOutOfBoundsException
+```
+
+There may be an explicit limit on the size of cached data, or an implicit
+limit based by the available capacity of the destination filesystem.
+When a limit is reached, `write()` SHOULD fail with an `IOException`.
+
+After the operation has returned, the buffer may be re-used. The outcome
+of updates to the buffer while the `write()` operation is in progress is undefined.
+
+#### Postconditions
+
+```python
+Stream'.buffer = Stream.buffer + data[offset...(offset + len)]
+```
+
+### `write(byte[] data)`
+
+This is defined as the equivalent of:
+
+```python
+write(data, 0, data.length)
+```
+
+### `flush()`
+
+Requests that the data is flushed. The specification of `ObjectStream.flush()`
+declares that this SHOULD write data to the "intended destination".
+
+It explicitly precludes any guarantees about durability.
+
+For that reason, this document doesn't provide any normative
+specifications of behaviour.
+
+#### Preconditions
+
+```python
+Stream.open else raise IOException
+```
+
+#### Postconditions
+
+None.
+
+If the implementation chooses to implement a stream-flushing operation,
+the data may be saved to the file system such that it becomes visible to
+others"
+
+```python
+FS' = FS where data(FS', path) == buffer
+```
+
+Some applications have been known to call `flush()` on a closed stream
+on the assumption that it is harmless. Implementations MAY choose to
+support this behaviour.
+
+### `close()`
+
+The `close()` operation saves all data to the filesystem and
+releases any resources used for writing data.
+
+The `close()` call is expected to block
+until the write has completed (as with `Syncable.hflush()`), possibly
+until it has been written to durable storage (as HDFS does).
+
+After `close()` completes, the data in a file MUST be visible and consistent
+with the data most recently written. The metadata of the file MUST be consistent
+with the data and the write history itself (i.e. any modification time fields
+updated).
+
+After `close()` is invoked, all subsequent `write()` calls on the stream
+MUST fail with an `IOException`.
+
+
+Any locking/leaseholding mechanism is also required to release its lock/lease.
+
+```python
+Stream'.open = false
+FS' = FS where data(FS', path) == buffer
+```
+
+The `close()` call MAY fail during its operation.
+
+1. Callers of the API MUST expect for some calls to fail and SHOULD code appropriately.
+Catching and swallowing exceptions, while common, is not always the ideal solution.
+1. Even after a failure, `close()` MUST place the stream into a closed state.
+Follow-on calls to `close()` are ignored, and calls to other methods
+rejected. That is: caller's cannot be expected to call `close()` repeatedly
+until it succeeds.
+1. The duration of the `call()` operation is undefined. Operations which rely
+on acknowledgements from remote systems to meet the persistence guarantees
+implicitly have to await these acknowledgements. Some Object Store output streams
+upload the entire data file in the `close()` operation. This can take a large amount
+of time. The fact that many user applications assume that `close()` is both fast
+and does not fail means that this behavior is dangerous.
+
+Recommendations for safe use by callers
+
+* Do plan for exceptions being raised, either in catching and logging or
+by throwing the exception further up. Catching and silently swallowing exceptions
+may hide serious problems.
+* Heartbeat operations SHOULD take place on a separate thread, so that a long
+delay in `close()` does not block the thread so long that the heartbeat times
+out.
+
+
+## Interface `StreamCapabilities`
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+```
+
+The `StreamCapabilities` interface exists to allow callers to dynamically
+determine the behavior of a stream.
+
+
+The reference Implementation of this interface is
+ `org.apache.hadoop.hdfs.DFSOutputStream`
+
+```java
+public boolean hasCapability(String capability) {
+ if (capability.equalsIgnoreCase(HSYNC.getValue()) ||
+ capability.equalsIgnoreCase((HFLUSH.getValue()))) {
+ return true;
+ }
+ return false;
+}
+```
+
+Where `HSYNC` and `HFLUSH` are items in the enumeration
+`org.apache.hadoop.fs.StreamCapabilities.StreamCapability`.
+
+
+## `org.apache.hadoop.fs.Syncable`
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface Syncable {
+ /**
+ * @deprecated As of HADOOP 0.21.0, replaced by hflush
+ * @see #hflush()
+ */
+ @Deprecated void sync() throws IOException;
+
+ /** Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ * @throws IOException if any error occurs
+ */
+ void hflush() throws IOException;
+
+ /** Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ * @throws IOException if error occurs
+ */
+ void hsync() throws IOException;
+}
+```
+
+The purpose of `Syncable` interface is to provide guarantees that data is written
+to a filesystem for both visibility and durability.
+
+*SYNC-1*: An `OutputStream` which implements `Syncable` is
+making an explicit declaration of an that it can meet those guarantees.
+meet those guarantees.
+
+*SYNC-2*: The interface MUST NOT be declared as implemented by an `OutputStream` unless
+those guarantees can be met.
+
+The `Syncable` interface has been implemented by other classes than
+subclasses of `OutputStream`, such as `org.apache.hadoop.io.SequenceFile.Writer`.
+
+*SYNC-3* The fact that a class implements `Syncable` does not guarantee
+that `extends OutputStream` holds.
+
+That is, for any class `C`: `(C instanceof Syncable)` does not imply
+`(C instanceof OutputStream)`
+
+This specification only covers the required behavior of `OutputStream` subclasses
+which implement `Syncable`.
+
+
+*SYNC-4:* The return value of `FileSystem.create(Path)` is an instance
+of `FSDataOutputStream`.
+
+*SYNC-5:* `FSDataOutputStream implements Syncable`
+
+
+SYNC-5 and SYNC-1 imply that all output streams which can be created
+with `FileSystem.create(Path)` must support the semantics of `Syncable`.
+Any inspection of the Hadoop codebase will make clear that this is demonstrably
+not true, therefore these declarations SYNC-1 and SYNC-2 are
+demonstrably false.
+
+Put differently: *callers cannot rely on the presence of the interface
+as evidence that the semantics of `Syncable` are supported*. Instead
+they should be dynamically probed for using the `StreamCapabilities`
+interface, where available.
+
+
+### `Syncable.hflush()`
+
+Flush out the data in client's user buffer. After the return of
+this call, new readers will see the data. The `hflush()` operation
+does contain guarantees as to the durability of the data. only
+is visibility.
+Thus implementations may cache the written data in memory
+—visible to all, but not yet persisted.
+
+#### Preconditions
+
+```python
+hasCapability(Stream. "hflush")
+Stream.open else raise IOException
+```
+
+
+#### Postconditions
+
+```python
+FS' = FS where data(path) == cache
+```
+
+
+After the call returns, the data MUST be visible to all new callers
+of `FileSystem.open(path)` and `FileSystem.openFile(path).build()`.
+
+There is no requirement or guarantee that clients with an existing
+`DataInputStream` created by a call to `(FS, path)` will see the updated
+data, nor is there a guarantee that they *will not* in a current or subsequent
+read.
+
+Implementation note: as a correct `hsync()` implementation must also
+offer all the semantics of an `hflush()` call, implementations of `hflush()`
+may just invoke `hsync()`:
+
+```java
+public void hflush() throws IOException {
+ hsync();
+}
+```
+
+### `Syncable.hsync()`
+
+Similar to POSIX `fsync`, this call saves the data in client's user buffer
+all the way to the disk device (but the disk may have it in its cache).
+
+That is: it is a requirement for the underlying FS To save all the data to
+the disk hardware itself, where it is expected to be durable.
+
+#### Preconditions
+
+```python
+hasCapability(Stream, "hsync")
+Stream.open else raise IOException
+```
+
+#### Postconditions
+
+```python
+FS' = FS where data(path) == buffer
+```
+
+The reference implementation, `DFSOutputStream` will block
+until an acknowledgement is received from the datanodes: That is, all hosts
+in the replica write chain have successfully written the file.
+
+That means that the expectation callers may have is that the return of
+the method call contains visibility and durability guarantees which other
+implementations must maintain.
+
+Note, however, that the reference `DFSOutputStream.hsync()` call only actually syncs/
+*the current block*. If there have been a series of writes since the last sync,
+such that a block boundary has been crossed. The `hsync()` call claims only
+to write the most recent.
+
+
+From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)`
+
+> Note that only the current block is flushed to the disk device.
+> To guarantee durable sync across block boundaries the stream should
+> be created with {@link CreateFlag#SYNC_BLOCK}.
+
+
+In virtual machines, the notion of "disk hardware" is really that of
+another software abstraction: there are guarantees.
+
+
+## `interface CanSetDropBehind`
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CanSetDropBehind {
+ /**
+ * Configure whether the stream should drop the cache.
+ *
+ * @param dropCache Whether to drop the cache. null means to use the
+ * default value.
+ * @throws IOException If there was an error changing the dropBehind
+ * setting.
+ * UnsupportedOperationException If this stream doesn't support
+ * setting the drop-behind.
+ */
+ void setDropBehind(Boolean dropCache)
+ throws IOException, UnsupportedOperationException;
+}
+```
+
+This interface allows callers to change policies used inside HDFS.
+They are currently unimplemented by any stream other than those in HDFS.
+
+
+
+## Durability, Concurrency, Consistency and Visibility of stream output.
+
+These are the aspects of the system behaviour which are not directly
+covered in this (very simplistic) Filesystem model, but which are visible
+in production.
+
+
+## Durability
+
+1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously
+1. `OutputStream.flush()` flushes data to the destination. There
+are no strict persistence requirements.
+1. `Syncable.hflush()` synchronously sends all local data to the destination
+filesystem. After returning to the caller, the data MUST be visible to other readers,
+it MAY be durable. That is: it does not have to be persisted, merely guaranteed
+to be consistently visible to all clients attempting to open a new stream reading
+data at the path.
+1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable
+storage.
+1. `close()` The first call to `close()` MUST flush out all remaining data in
+the buffers, and persist it.
+
+
+## Concurrency
+
+1. The outcome of more than one process writing to the same file is undefined.
+
+1. An input stream opened to read a file *before the file was opened for writing*
+MAY fetch data updated by writes to an OutputStream.
+Because of buffering and caching, this is not a requirement
+—and if a input stream does pick up updated data, the point at
+which the updated data is read is undefined. This surfaces in object stores
+where a `seek()` call which closes and re-opens the connection may pick up
+updated data, while forward stream reads do not. Similarly, in block-oriented
+filesystems, the data may be cached a block at a time —and changes only picked
+up when a different block is read.
+
+1. A Filesystem MAY allow the destination path to be manipulated while a stream
+is writing to it —for example, `rename()` of the path or a parent; `delete()` of
+a path or parent. In such a case, the outcome of future write operations on
+the output stream is undefined. Some filesystems MAY implement locking to
+prevent conflict. However, this tends to be rare on distributed filesystems,
+for reasons well known in the literature.
+
+1. The Java API specification of `java.io.OutputStream` does not require
+an instance of the class to be thread safe.
+However, `org.apache.hadoop.hdfs.DFSOutputStream`
+has a stronger thread safety model (possibly unintentionally). This fact is
+relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations
+SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization
+model permits the output stream to have `close()` invoked while awaiting an
+acknowledgement from datanode or namenode writes in an `hsync()` operation.
+
+## Consistency and Visibility
+
+There is no requirement for the data to be immediately visible to other applications
+—not until a specific call to flush buffers or persist it to the underlying storage
+medium are made.
+
+If an output stream is created with `FileSystem.create(path, overwrite==true)`
+and there is an existing file at the path, that is `exists(FS, path)` holds,
+then, the existing data is immediately unavailable; the data at the end of the
+path MUST consist of an empty byte sequence `[]`, with consistent metadata.
+
+
+```python
+exists(FS, path)
+(Stream', FS') = create(FS, path)
+exists(FS', path)
+getFileStatus(FS', path).getLen() = 0
+```
+
+The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent
+with the contents of the file after `flush()` and `sync()`.
+
+```python
+(Stream', FS') = create(FS, path)
+(Stream'', FS'') = write(Stream', data)
+(Stream''', FS''') hsync(Stream'')
+exists(FS''', path)
+getFileStatus(FS''', path).getLen() = len(data)
+```
+
+HDFS does not do this except when the write crosses a block boundary; to do
+otherwise would overload the Namenode. As a result, while a file is being written
+`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`.
+
+The metadata MUST be consistent with the contents of a file after the `close()`
+operation.
+
+After the contents of an output stream have been persisted (`hflush()/hsync()`)
+all new `open(FS, Path)` operations MUST return the updated data.
+
+After `close()` has been invoked on an output stream,
+a call to `getFileStatus(path)` MUST return the final metadata of the written file,
+including length and modification time.
+The metadata of the file returned in any of the FileSystem `list` operations
+MUST be consistent with this metadata.
+
+The value of `getFileStatus(path).getModificationTime()` is not defined
+while a stream is being written to.
+The timestamp MAY be updated while a file is being written,
+especially after a `Syncable.hsync()` call.
+The timestamps MUST be updated after the file is closed
+to that of a clock value observed by the server during the `close()` call.
+It is *likely* to be in the time and time zone of the filesystem, rather
+than that of the client.
+
+Formally, if a `close()` operation triggers an interaction with a server
+which starts at server-side time `t1` and completes at time `t2` with a successfully
+written file, then the last modification time SHOULD be a time `t` where
+`t1 <= t <= t2`
+
+## Issues with the Hadoop Output Stream model.
+
+There are some known issues with the output stream model as offered by Hadoop,
+specifically about the guarantees about when data is written and persisted
+—and when the metadata is synchronized.
+These are where implementation aspects of HDFS and the "Local" filesystem
+do not the simple model of the filesystem used in this specification.
+
+### HDFS
+
+That HDFS file metadata often lags the content of a file being written
+to is not something everyone expects, nor convenient for any program trying
+pick up updated data in a file being written. Most visible is the length
+of a file returned in the various `list` commands and `getFileStatus` —this
+is often out of data.
+
+As HDFS only supports file growth in its output operations, this means
+that the size of the file as listed in the metadata may be less than or equal
+to the number of available bytes —but never larger. This is a guarantee which
+is also held
+
+One Algorithm to determine whether a file in HDFS is updated is:
+
+1. Remember the last read position `pos` in the file, using `0` if this is the initial
+read.
+1. Use `getFileStatus(FS, Path)` to query the updated length of the file as
+recorded in the metadata.
+1. If `Status.length > pos`, the file has grown.
+1. If the number has not changed, then
+ 1. reopen the file.
+ 1. `seek(pos)` to that location
+ 1. If `read() != -1`, there is new data.
+
+This algorithm works for filesystems which are consistent with metadata and
+data, as well as HDFS. What is important to know is that, in HDFS
+`getFileStatus(FS, path).getLen()==0` does not imply that `data(FS, path)` is
+empty.
+
+### Local Filesystem, `file:`
+
+`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on
+`ChecksumFileSystem`) has a different issue. If an output stream
+is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has
+*not* been called on the filesystem, then the FS only flushes as much
+local data as can be written to full checksummed blocks of data.
+
+That is, the flush operations are not guaranteed to write all the pending
+data until the file is finally closed.
+
+That is, `sync()`, `hsync()` and `hflush` may not persist all data written
+to the stream.
+
+For anyone thinking "this is a violation of this specification" —they are correct.
+The local filesystem is intended for testing, rather than production use.
+
+### Checksummed output streams
+
+Because `org.apache.hadoop.fs.FSOutputSummer` and
+`org.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer`
+implement the underlying checksummed output stream used by HDFS and
+other filesystems, it provides some of the core semantics of the output
+stream behavior.
+
+
+1. The `close()` call is unsynchronized, re-entrant and may attempt
+to close the stream more than once.
+1. It is possible to call `write(int)` on a closed stream (but not
+`write(byte[], int, int)`).
+1. It is possible to call `flush()` on a closed stream.
+
+Behaviors 1 & 2 really have to be considered bugs to fix, albeit with care.
+
+
+### Object Stores
+
+Object store streams tend to buffer the entire stream's output
+until the final `close()` operation triggers a single `PUT` of the data
+and materialization of the final output.
+
+This significantly change's their behaviour compared to that of
+POSIX filesystems and that specified in this document.
+
+#### Visibility of newly created files
+
+There is no guarantee that any file will be visible at the path of an output
+stream after the output stream is created .
+
+That is: while `create(FS, path, boolean)` returns a new stream
+
+```python
+Stream' = (path, true, [])
+```
+
+The other postcondition of the operation, `data(FS', path) == []` may not
+hold,
+
+1. `exists(FS, p)` MAY return false.
+1. If a file was created with `overwrite = True`, the existing data my still
+be visible: `data(FS', path) = data(FS, path)`.
+
+1. The check for existing data in a `create()` call with `overwrite=False`, may
+take place in the `create()` call itself, in the `close()` call prior to/during
+the write, or at some point in between. Expect in the special case that the
+object store supports an atomic PUT operation, the check for existence of
+existing data and the subsequent creation of data at the path contains a race
+condition: other clients may create data at the path between the existence check
+and the subsequent write.
+
+1. Calls to `create(FS, Path, overwrite=false)` may succeed, returning a new
+`OutputStream`, even while another stream is open and writing to the destination
+path.
+
+This allows for the following sequence of operations, which would
+raise an exception in the second `open()` call if invoked against HDFS:
+
+```python
+Stream1 = open(FS, path, false)
+sleep(200)
+Stream2 = open(FS, path, false)
+Stream.write('a')
+Stream1.close()
+Stream2.close()
+```
+
+For anyone wondering why the clients create a 0-byte file in the create call,
+it would cause problems after `close()` —the marker file could get
+returned in `open()` calls instead of the final data.
+
+#### Visibility of the output of a stream after `close()`
+
+One guarantee which Object Stores SHOULD make is the same as those of POSIX
+filesystems: After a stream `close()` call returns, the data MUST be persisted
+durably and visible to all callers. Unfortunately, even that guarantee is
+not always met:
+
+1. Existing data on a path MAY be visible for an indeterminate period of time.
+
+1. If the store has any form of create inconsistency or buffering of negative
+existence probes, then even after the stream's `close()` operation has returned,
+`getFileStatus(FS, path)` and `open(FS, path)` may fail with a `FileNotFoundException`.
+
+In their favour, the atomicity of the store's PUT operations do offer their
+own guarantee: a newly created object is either absent or all of its data
+is present: the act of instantiating the object, while potentially exhibiting
+create inconsistency, is atomic. Applications may be able to use that fact
+to their advantage.
+
+
+#### Other issues
+
+The `Syncable` interfaces and methods are rarely implemented. Use
+`StreamCapabilities` to determine their availability.
+
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
index 07c99e0b6a528..f137fc3665775 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java
@@ -18,13 +18,18 @@
package org.apache.hadoop.fs.contract;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -42,6 +47,9 @@
public abstract class AbstractContractCreateTest extends
AbstractFSContractTestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractContractCreateTest.class);
+
/**
* How long to wait for a path to become visible.
*/
@@ -189,12 +197,8 @@ private void testOverwriteNonEmptyDirectory(boolean useBuilder)
} catch (FileAlreadyExistsException expected) {
//expected
handleExpectedException(expected);
- } catch (FileNotFoundException e) {
- handleRelaxedException("overwriting a dir with a file ",
- "FileAlreadyExistsException",
- e);
} catch (IOException e) {
- handleRelaxedException("overwriting a dir with a file ",
+ handleRelaxedException("overwriting a dir with a file",
"FileAlreadyExistsException",
e);
}
@@ -332,4 +336,78 @@ public void testCreateMakesParentDirs() throws Throwable {
assertTrue("Grandparent directory does not appear to be a directory",
fs.getFileStatus(grandparent).isDirectory());
}
+
+ @Test
+ public void testHSync() throws Throwable {
+ describe("test declared and actual Syncable behaviors");
+ Path path = methodPath();
+ FileSystem fs = getFileSystem();
+ boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
+ boolean supportsSync = isSupported(SUPPORTS_HSYNC);
+
+ try (FSDataOutputStream out = fs.create(path, true)) {
+
+ boolean doesHFlush = out.hasCapability(
+ StreamCapabilities.StreamCapability.HFLUSH.getValue());
+ if (doesHFlush) {
+ out.hflush();
+ }
+ assertEquals("hflush support", supportsFlush, doesHFlush);
+ boolean doesHSync = out.hasCapability(
+ StreamCapabilities.StreamCapability.HSYNC.getValue());
+ assertEquals("HFlush support", supportsSync, doesHSync);
+
+ try {
+ out.hflush();
+ if (!supportsFlush) {
+ // hsync not ignored
+ LOG.warn("FS doesn't support Syncable.hflush(),"
+ + " but doesn't reject it either.");
+ }
+ } catch (UnsupportedOperationException e) {
+ if (supportsFlush) {
+ throw new AssertionError("hflush not supported", e);
+ }
+ }
+ out.write('a');
+ try {
+ out.hsync();
+ } catch (UnsupportedOperationException e) {
+ if (supportsSync) {
+ throw new AssertionError("HSync not supported", e);
+ }
+ }
+
+ if (supportsSync) {
+ // if sync really worked, data is visible here
+
+ try(FSDataInputStream in = fs.open(path)) {
+ assertEquals('a', in.read());
+ assertEquals(-1, in.read());
+ }
+ } else {
+ out.flush();
+ // no sync. let's see what's there
+ try (FSDataInputStream in = fs.open(path)) {
+
+ int c = in.read();
+ if (c == -1) {
+ // nothing was synced; sync and flush really aren't there.
+ } else {
+ LOG.info("sync and flush are declared unsupported"
+ + " - but the stream does offer some sync/flush semantics");
+ }
+ } catch (FileNotFoundException e) {
+ // that's OK if it's an object store, but not if its a real
+ // FS
+ if (!isSupported(IS_BLOBSTORE)) {
+ throw e;
+ }
+ }
+
+ }
+
+ }
+
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
index 1cd2164fad300..146569e8bf0d9 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
@@ -272,7 +272,7 @@ protected void handleRelaxedException(String action,
if (getContract().isSupported(SUPPORTS_STRICT_EXCEPTIONS, false)) {
throw e;
}
- LOG.warn("The expected exception {} was not the exception class" +
+ LOG.warn("The expected exception {} was not the exception class" +
" raised on {}: {}", action , e.getClass(), expectedException, e);
}
@@ -375,4 +375,13 @@ protected String generateAndLogErrorListing(Path src, Path dst) throws
}
return destDirLS;
}
+
+ /**
+ * Get a path from the name of the current test case.
+ * @return a path based on the test method being executed.
+ * @throws IOException IO problem
+ */
+ protected Path methodPath() throws IOException {
+ return path(methodName.getMethodName());
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
index 91a112141e987..5d258b4144e3b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java
@@ -232,4 +232,13 @@ public interface ContractOptions {
*/
String TEST_RANDOM_SEEK_COUNT = "test.random-seek-count";
+ /**
+ * Is hflush supported in API and StreamCapabilities?
+ */
+ String SUPPORTS_HFLUSH = "supports-hflush";
+
+ /**
+ * Is hsync supported in API and StreamCapabilities?
+ */
+ String SUPPORTS_HSYNC = "supports-hsync";
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
index 3c9fcccc73846..d7e039647d5fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml
@@ -116,4 +116,14 @@
true
+
+ fs.contract.supports-hflush
+ true
+
+
+
+ fs.contract.supports-hsync
+ true
+
+
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index a60f9af2a938b..0ebb7f9e9c91d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -25,10 +25,8 @@
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
@@ -47,6 +45,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.impl.StreamStateModel;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.util.Progressable;
@@ -98,8 +97,8 @@ class S3ABlockOutputStream extends OutputStream implements
/** Multipart upload details; null means none started. */
private MultiPartUpload multiPartUpload;
- /** Closed flag. */
- private final AtomicBoolean closed = new AtomicBoolean(false);
+ /** Stream state. */
+ private final StreamStateModel stateModel;
/** Current data block. Null means none currently active */
private S3ADataBlocks.DataBlock activeBlock;
@@ -163,6 +162,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.progressListener = (progress instanceof ProgressListener) ?
(ProgressListener) progress
: new ProgressableListener(progress);
+ this.stateModel = new StreamStateModel(fs.keyToQualifiedPath(key));
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
@@ -220,33 +220,27 @@ private void clearActiveBlock() {
}
}
- /**
- * Check for the filesystem being open.
- * @throws IOException if the filesystem is closed.
- */
- void checkOpen() throws IOException {
- if (closed.get()) {
- throw new IOException("Filesystem " + writeOperationHelper + " closed");
- }
- }
-
/**
* The flush operation does not trigger an upload; that awaits
* the next block being full. What it does do is call {@code flush() }
* on the current block, leaving it to choose how to react.
+ *
+ * Downgrades to a no-op if called on a closed stream.
* @throws IOException Any IO problem.
*/
@Override
- public synchronized void flush() throws IOException {
- try {
- checkOpen();
- } catch (IOException e) {
- LOG.warn("Stream closed: " + e.getMessage());
- return;
- }
- S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
- if (dataBlock != null) {
- dataBlock.flush();
+ public void flush() throws IOException {
+ if (StreamStateModel.State.Open.equals(acquireLock(false))) {
+ try {
+ S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
+ if (dataBlock != null) {
+ dataBlock.flush();
+ }
+ } catch (IOException ex) {
+ throw enterErrorState(ex);
+ } finally {
+ releaseLock();
+ }
}
}
@@ -273,14 +267,67 @@ public synchronized void write(int b) throws IOException {
* @throws IOException on any problem
*/
@Override
- public synchronized void write(byte[] source, int offset, int len)
+ public void write(byte[] source, int offset, int len)
throws IOException {
S3ADataBlocks.validateWriteArgs(source, offset, len);
- checkOpen();
if (len == 0) {
return;
}
+ acquireLock(true);
+ try {
+ innerWrite(source, offset, len);
+ } catch (IOException ex) {
+ throw enterErrorState(ex);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ /**
+ * Acquire the lock.
+ * @param checkOpen should the lock
+ * @return The stream state
+ * @throws IOException if the checkOpen operation raises an exception.
+ */
+ private synchronized StreamStateModel.State acquireLock(final boolean checkOpen)
+ throws IOException {
+ stateModel.acquireLock(checkOpen);
+ return stateModel.getState();
+ }
+
+ /**
+ * Release the state lock through {@link StreamStateModel#releaseLock()}.
+ * Must only be called during the {@code finally} clause following
+ * an {@link #acquireLock(boolean)} call.
+ */
+ private void releaseLock() {
+ stateModel.releaseLock();
+ }
+
+ /**
+ * Change the stream state to error, using the given exception if the stream
+ * was not already in an error state, via
+ * {@link StreamStateModel#enterErrorState(IOException)}.
+ * @param ex exception to record as the cause of the failure.
+ * @return the exception to throw.
+ */
+ private IOException enterErrorState(IOException ex) {
+ return stateModel.enterErrorState(ex);
+ }
+
+ /**
+ * The inner write.
+ * This is called recursively until all the source data is written.
+ * It requires that the stream state has acquired the lock already, so does
+ * not attempt to reacquire or re-release it.
+ * @param source source file.
+ * @param offset offset
+ * @param len length
+ * @throws IOException on a failure
+ */
+ private void innerWrite(final byte[] source, final int offset, final int len)
+ throws IOException {
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
int remainingCapacity = block.remainingCapacity();
@@ -292,7 +339,7 @@ public synchronized void write(byte[] source, int offset, int len)
uploadCurrentBlock();
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
- this.write(source, offset + written, len - written);
+ innerWrite(source, offset + written, len - written);
} else {
if (remainingCapacity == 0) {
// the whole buffer is done, trigger an upload
@@ -344,12 +391,39 @@ private void initMultipartUpload() throws IOException {
*/
@Override
public void close() throws IOException {
- if (closed.getAndSet(true)) {
- // already closed
- LOG.debug("Ignoring close() as stream is already closed");
- return;
+ boolean alreadyClosed;
+ acquireLock(false);
+ try {
+ synchronized (this) {
+ // this is synchronized to order its execution w.r.t any methods
+ // which are marked as synchronized.
+ // because the whole close() method is called, calling it on a stream
+ // which has just been closed isn't going to block it for the duration
+ // of the entire upload.
+ alreadyClosed = !stateModel.enterClosedState();
+ }
+ } finally {
+ releaseLock();
}
- S3ADataBlocks.DataBlock block = getActiveBlock();
+ if (alreadyClosed) {
+ // already closed or in error state.
+ if (stateModel.isInState(StreamStateModel.State.Error)) {
+ // error state.
+ if (multiPartUpload != null) {
+ // attempt to abort any ongoing MPU.
+ multiPartUpload.abort();
+ }
+ // Log the operation as failing and rethrow.
+ IOException ioe = stateModel.getException();
+ writeOperationHelper.writeFailed(ioe);
+ throw ioe;
+ } else {
+ LOG.debug("Ignoring close() as stream is not open");
+ return;
+ }
+ }
+
+ S3ADataBlocks.DataBlock block = getActiveBlock();
boolean hasBlock = hasActiveBlock();
LOG.debug("{}: Closing block #{}: current block= {}",
this,
@@ -393,8 +467,9 @@ public void close() throws IOException {
}
LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
} catch (IOException ioe) {
- writeOperationHelper.writeFailed(ioe);
- throw ioe;
+ final IOException ex = enterErrorState(ioe);
+ writeOperationHelper.writeFailed(ex);
+ throw ex;
} finally {
closeAll(LOG, block, blockFactory);
LOG.debug("Statistics: {}", statistics);
@@ -673,9 +748,7 @@ private void complete(List partETags)
* Abort a multi-part upload. Retries are attempted on failures.
* IOExceptions are caught; this is expected to be run as a cleanup process.
*/
- public void abort() {
- int retryCount = 0;
- AmazonClientException lastException;
+ private void abort() {
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
try {
writeOperationHelper.abortMultipartUpload(key, uploadId,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
deleted file mode 100644
index ff176f58da67d..0000000000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import org.apache.hadoop.fs.s3a.commit.PutTracker;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-/**
- * Unit tests for {@link S3ABlockOutputStream}.
- */
-public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
-
- private S3ABlockOutputStream stream;
-
- @Before
- public void setUp() throws Exception {
- ExecutorService executorService = mock(ExecutorService.class);
- Progressable progressable = mock(Progressable.class);
- S3ADataBlocks.BlockFactory blockFactory =
- mock(S3ADataBlocks.BlockFactory.class);
- long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
- S3AInstrumentation.OutputStreamStatistics statistics = null;
- WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
- PutTracker putTracker = mock(PutTracker.class);
- stream = spy(new S3ABlockOutputStream(fs, "", executorService,
- progressable, blockSize, blockFactory, statistics, oHelper,
- putTracker));
- }
-
- @Test
- public void testFlushNoOpWhenStreamClosed() throws Exception {
- doThrow(new IOException()).when(stream).checkOpen();
-
- try {
- stream.flush();
- } catch (Exception e){
- fail("Should not have any exception.");
- }
- }
-}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
index 2b89fb0a73242..f8bb8e33b1403 100644
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
@@ -22,6 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import java.io.IOException;
@@ -42,7 +44,8 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public final class AdlFsOutputStream extends OutputStream implements Syncable {
+public final class AdlFsOutputStream extends OutputStream
+ implements Syncable, StreamCapabilities {
private final ADLFileOutputStream out;
public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration)
@@ -79,4 +82,9 @@ public synchronized void hflush() throws IOException {
public synchronized void hsync() throws IOException {
out.flush();
}
+
+ @Override
+ public boolean hasCapability(String capability) {
+ return StoreImplementationUtils.supportsSyncable(capability);
+ }
}
diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml
index 5bbdd6fbb8645..e45a662229377 100644
--- a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml
+++ b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml
@@ -148,4 +148,14 @@
true
+
+ fs.contract.supports-hflush
+ true
+
+
+
+ fs.contract.supports-hsync
+ true
+
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 93c54d386f716..f4f0e91da73ef 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -69,6 +69,7 @@
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -1051,10 +1052,7 @@ public void hsync() throws IOException {
*/
@Override // StreamCapability
public boolean hasCapability(String capability) {
- if (out instanceof StreamCapabilities) {
- return ((StreamCapabilities) out).hasCapability(capability);
- }
- return false;
+ return StoreImplementationUtils.hasCapability(out, capability);
}
@Override
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
index dcfff2fbe3784..0721576bb0687 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.impl.StoreImplementationUtils;
/**
* Support the Syncable interface on top of a DataOutputStream.
@@ -51,10 +52,7 @@ public OutputStream getOutStream() {
@Override
public boolean hasCapability(String capability) {
- if (out instanceof StreamCapabilities) {
- return ((StreamCapabilities) out).hasCapability(capability);
- }
- return false;
+ return StoreImplementationUtils.hasCapability(out, capability);
}
@Override