From 486b78a46d3805473be03c17e949f2f943111a2f Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 5 Nov 2019 16:07:03 +0000 Subject: [PATCH 1/8] HADOOP-13327 Output Stream Specification. This PR removes the changes related to S3A output stream lifecycle, so only covers the specification of Syncable and ensures that StreamCapabilities passes all the way through to the final implementation classes. All streams which implement Syncable hsync/hflush declare this in their stream capabilities Change-Id: I82b16a8e0965f34eb0c42504da43e8fbeabcb68c --- .../hadoop/crypto/CryptoOutputStream.java | 6 +- .../apache/hadoop/fs/CanSetDropBehind.java | 2 +- .../apache/hadoop/fs/FSDataInputStream.java | 6 +- .../apache/hadoop/fs/FSDataOutputStream.java | 6 +- .../org/apache/hadoop/fs/FSOutputSummer.java | 8 +- .../java/org/apache/hadoop/fs/Syncable.java | 16 +- .../fs/impl/StoreImplementationUtils.java | 95 ++ .../site/markdown/filesystem/filesystem.md | 18 +- .../src/site/markdown/filesystem/index.md | 1 + .../site/markdown/filesystem/outputstream.md | 889 ++++++++++++++++++ .../contract/AbstractContractCreateTest.java | 81 ++ .../contract/AbstractFSContractTestBase.java | 11 +- .../hadoop/fs/contract/ContractOptions.java | 9 + .../src/test/resources/contract/hdfs.xml | 10 + .../hadoop/fs/adl/AdlFsOutputStream.java | 10 +- .../src/test/resources/adls.xml | 10 + .../fs/azure/NativeAzureFileSystem.java | 6 +- .../fs/azure/SyncableDataOutputStream.java | 7 +- 18 files changed, 1157 insertions(+), 34 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java index aeb6e4d0ed2ef..dfa72a3b6ba25 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -308,9 +309,6 @@ private void freeBuffers() { @Override public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java index 2e2d98b9c5462..0077838920a9e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java @@ -36,6 +36,6 @@ public interface CanSetDropBehind { * UnsupportedOperationException If this stream doesn't support * setting the drop-behind. */ - public void setDropBehind(Boolean dropCache) + void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 31f82975899e1..861070158b029 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.util.IdentityHashStore; @@ -234,10 +235,7 @@ public void unbuffer() { @Override public boolean hasCapability(String capability) { - if (in instanceof StreamCapabilities) { - return ((StreamCapabilities) in).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(in, capability); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 5b604e58e2360..08dd3262da8db 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}. */ @@ -122,10 +123,7 @@ public OutputStream getWrappedStream() { @Override public boolean hasCapability(String capability) { - if (wrappedStream instanceof StreamCapabilities) { - return ((StreamCapabilities) wrappedStream).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(wrappedStream, capability); } @Override // Syncable diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index 2458b2f40d8d7..aaa19adf8c6a4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -33,7 +33,8 @@ */ @InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceStability.Unstable -abstract public class FSOutputSummer extends OutputStream { +abstract public class FSOutputSummer extends OutputStream implements + StreamCapabilities { // data checksum private final DataChecksum sum; // internal buffer for storing data before it is checksumed @@ -254,4 +255,9 @@ protected synchronized void setChecksumBufSize(int size) { protected synchronized void resetChecksumBufSize() { setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS); } + + @Override + public boolean hasCapability(String capability) { + return false; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java index 7ec3509ce1df6..e23e8d8aabca1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java @@ -23,20 +23,24 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -/** This interface for flush/sync operation. */ +/** + * This is the interface for flush/sync operation. + * Consult the Hadoop filesystem specification for the definition of the + * semantics of this operation. + */ @InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceStability.Stable public interface Syncable { - + /** Flush out the data in client's user buffer. After the return of * this call, new readers will see the data. * @throws IOException if any error occurs */ - public void hflush() throws IOException; - + void hflush() throws IOException; + /** Similar to posix fsync, flush out the data in client's user buffer * all the way to the disk device (but the disk may have it in its cache). * @throws IOException if error occurs */ - public void hsync() throws IOException; + void hsync() throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java new file mode 100644 index 0000000000000..68c53b1a1cc77 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StreamCapabilities; + +import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH; +import static org.apache.hadoop.fs.StreamCapabilities.HSYNC; + +/** + * Utility classes to help implementing filesystems and streams. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class StoreImplementationUtils { + + private StoreImplementationUtils() { + } + + /** + * Check the supplied capabilities for being those required for full + * {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality. + * @param capability capability string. + * @return true if either refers to one of the Syncable operations. + */ + public static boolean supportsSyncable(String capability) { + return capability.equalsIgnoreCase(HSYNC) || + capability.equalsIgnoreCase(HFLUSH); + } + + /** + * Probe for an object having a capability; returns true + * iff the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * This is a package private method intended to provided a common + * implementation for input and output streams. + * {@link StreamCapabilities#hasCapability(String)} call is for public use. + * @param object object to probe. + * @param capability capability to probe for + * @return true iff the object implements stream capabilities and + * declares that it supports the capability. + */ + static boolean objectHasCapability(Object object, String capability) { + if (object instanceof StreamCapabilities) { + return ((StreamCapabilities) object).hasCapability(capability); + } + return false; + } + + /** + * Probe for an output stream having a capability; returns true + * iff the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * @param out output stream + * @param capability capability to probe for + * @return true iff the stream declares that it supports the capability. + */ + public static boolean hasCapability(OutputStream out, String capability) { + return objectHasCapability(out, capability); + } + + /** + * Probe for an input stream having a capability; returns true + * iff the stream implements {@link StreamCapabilities} and its + * {@code hasCapabilities()} method returns true for the capability. + * @param out output stream + * @param capability capability to probe for + * @return true iff the stream declares that it supports the capability. + */ + public static boolean hasCapability(InputStream out, String capability) { + return objectHasCapability(out, capability); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 284a964f6e522..7d07b19037d28 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -634,6 +634,8 @@ For instance, HDFS may raise an `InvalidPathException`. result = FSDataOutputStream +A zero byte file must exist at the end of the specified path, visible to all + The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`. The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of @@ -647,10 +649,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne clients creating files with `overwrite==true` to fail if the file is created by another client between the two tests. -* S3A, Swift and potentially other Object Stores do not currently change the FS state +* S3A, Swift and potentially other Object Stores do not currently change the `FS` state until the output stream `close()` operation is completed. -This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`, - and potentially confuse file/directory logic +This is a significant difference between the behavior of object stores +and that of filesystems, as it allows >1 client to create a file with `overwrite==false`, +and potentially confuse file/directory logic. In particular, using `create()` to acquire +an exclusive lock on a file (whoever creates the file without an error is considered +the holder of the lock) may not not a safe algorithm to use when working with object stores. + +* Object stores may create an empty file as a marker when a file is created. +However, object stores with `overwrite=true` semantics may not implement this atomically, +so creating files with `overwrite=false` cannot be used as an implicit exclusion +mechanism between processes. * The Local FileSystem raises a `FileNotFoundException` when trying to create a file over a directory, hence it is listed as an exception that MAY be raised when @@ -687,7 +697,7 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep #### Postconditions - FS + FS' = FS result = FSDataOutputStream Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md index df538ee6cf96b..0cc3423b08982 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md @@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications. 1. [Notation](notation.html) 1. [Model](model.html) 1. [FileSystem class](filesystem.html) +1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html) 1. [FSDataInputStream class](fsdatainputstream.html) 1. [PathCapabilities interface](pathcapabilities.html) 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md new file mode 100644 index 0000000000000..f3fd8764f466f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -0,0 +1,889 @@ + + + + +# Output: `OutputStream`, `Syncable` and `StreamCapabilities` + +## Introduction + +This document covers the Output Streams within the context of the +[Hadoop File System Specification](index.html). + +It uses the filesystem model defined in [A Model of a Hadoop Filesystem](model.html) +with the notation defined in [notation](Notation.md). + +The target audiences are +1. Users of the APIs. While `java.io.OutputStream` is a standard interfaces, +this document clarifies how it is implemented in HDFS and elsewhere. +The Hadoop-specific interfaces `Syncable` and `StreamCapabilities` are new; +`Syncable` is notable in offering durability and visibility guarantees which +exceed that of `OutputStream`. +1. Implementors of File Systems and clients. + +## How data is written to a filesystem + +The core mechanism to write data to files through the Hadoop FileSystem APIs +is through `OutputStream` subclasses obtained through calls to +`FileSystem.create()`, `FileSystem.append()`, +or `FSDataOutputStreamBuilder.build()`. + +These all return instances of `FSDataOutputStream`, through which data +can be written through various `write()` methods. +After a stream's `close()` method is called, all data written to the +stream MUST BE persisted to the fileysystem and visible to oll other +clients attempting to read data from that path via `FileSystem.open()`. + +As well as operations to write the data, Hadoop's `OutputStream` implementations +provide methods to flush buffered data back to the filesystem, +so as to ensure that the data is reliably persisted and/or visible +to other callers. This is done via the `Syncable` interface. It was +originally intended that the presence of this interface could be interpreted +as a guarantee that the stream supported its methods. However, this has proven +impossible to guarantee as the static nature of the interface is incompatible +with filesystems whose syncability semantics may vary on a store/path basis. +As an example, erasure coded files in HDFS do not support the Sync operations, +even though they are implemented as subclass of an output stream which is `Syncable`. + +A new interface: `StreamCapabilities`. This allows callers +to probe the exact capabilities of a stream, even transitively +through a chain of streams. + + + +## Output Stream Model + +For this specification, an output stream can be viewed as a list of bytes +stored in in the client + +```python +buffer: List[byte] +``` + +A flag, `open` tracks whether the stream is open: after the stream +is closed no more data may be written to it: + +```python +open: bool +buffer: List[byte] +``` + +The destination path of the stream, `path`, can be tracked to form a triple +`path, open, buffer` + +```python +Stream = (path: Path, open: Boolean, buffer: byte[]) +``` + +#### Visibility of Flushed Data + +(Immediately) after `Syncable` operations which flush data to the filesystem, +the data at the stream's destination path MUST match that of +`buffer`. That is, the following condition MUST hold: + +```python +FS'.Files(path) == buffer +``` + +Any client reading the data at the path MUST see the new data. +The two sync operations, `hflush()` and `hsync()` differ in their durability +guarantees, not visibility of data. + +### State of Stream and Filesystem after `Filesystem.create()` + +The output stream returned by a `FileSystem.create(path)` or +`FileSystem.createFile(path).build` +can be modeled as a triple containing an empty array of no data: + +```python +Stream' = (path, true, []) +``` + +The filesystem `FS'` MUST contain a 0-byte file at the path: + +```python +data(FS', path) == [] +``` + +Thus, the initial state of `Stream'.buffer` is implicitly +consistent with the data at the filesystem. + + +*Object Stores*: see caveats in the "Object Stores" section below. + +### State of Stream and Filesystem after `Filesystem.append()` + +The output stream returned from a call of + `FileSystem.append(path, buffersize, progress)`, +can be modelled as a stream whose `buffer` is intialized to that of +the original file: + +```python +Stream' = (path, true, data(FS, path)) +``` + +#### Persisting data + +When the stream writes data back to its store, be it in any +supported flush operation, in the `close()` operation, or at any other +time the stream chooses to do so, the contents of the file +are replaced with the current buffer + +```python +Stream' = (path, true, buffer) +FS' = FS where data(FS', path) == buffer +``` + +After a call to `close()`, the stream is closed for all operations other +than `close()`; they MAY fail with `IOException` or `RuntimeException`. + +```python +Stream' = (path, false, []) +``` + +The `close()` operation must be idempotent with the sole attempt to write the +data made in the first invocation. + +1. If `close()` succeeds, subsequent calls are no-ops. +1. If `close()` fails, again, subsequent calls are no-ops. They MAY rethrow +the previous exception, but they MUST NOT retry the write. + + + + + +## Class `FSDataOutputStream` + +```java +public class FSDataOutputStream + extends DataOutputStream + implements Syncable, CanSetDropBehind, StreamCapabilities { + // ... +} +``` + +The `FileSystem.create()`, `FileSystem.append()` and +`FSDataOutputStreamBuilder.build()` calls return an instance +of a class `FSDataOutputStream`, a subclass of `java.io.OutputStream`. + +The base class wraps an `OutputStream` instance, one which may implement `Streamable`, +`CanSetDropBehind` and `StreamCapabilities`. + +This document covers the requirements of such implementations. + +HDFS's `FileSystem` implementation, `DistributedFileSystem`, returns an instance +of `HdfsDataOutputStream`. This implementation has at least two behaviors +which are not explicitly declared by the base Java implmentation + +1. Writes are synchronized: more than one thread can write to the same +output stream. This is a use pattern which HBase relies on. + +1. `OutputStream.flush()` is a no-op when the file is closed. Apache Druid +has made such a call on this in the past +[HADOOP-14346](https://issues.apache.org/jira/browse/HADOOP-14346). + + +As the HDFS implementation is considered the de-facto specification of +the FileSystem APIs, the fact that `write()` is thread-safe is significant. + +For compatibility, not only must other FS clients be thread-safe, +but new HDFS featues, such as encryption and Erasure Coding must also +implement consistent behavior with the core HDFS output stream. + +Put differently: + +*It isn't enough for Output Streams to implement the core semantics +of `java.io.OutputStream`: they need to implement the extra semantics +of `HdfsDataOutputStream`, especially for HBase to work correctly. + +The concurrent `write()` call is the most significant tightening of +the Java specification. + +## Class `java.io.OutputStream` + +A Java `OutputStream` allows applications to write a sequence of bytes to a destination. +In a Hadoop filesystem, that destination is the data under a path in the filesystem. + +```java +public abstract class OutputStream implements Closeable, Flushable { + public abstract void write(int b) throws IOException; + public void write(byte b[]) throws IOException; + public void write(byte b[], int off, int len) throws IOException; + public void flush() throws IOException; + public void close() throws IOException; +} +``` +### `write(Stream, data)` + +Writes a byte of data to the stream. + +#### Preconditions + +```python +Stream.open else raise ClosedChannelException, PathIOException, IOException +``` + +The exception `java.nio.channels.ClosedChannelExceptionn` is +raised in the HDFS output streams when trying to write to a closed file. +This exception does not include the destination path; and +`Exception.getMessage()` is `null`. It is therefore of limited value in stack +traces. Implementors may wish to raise exceptions with more detail, such +as a `PathIOException`. + + +#### Postconditions + +The buffer has the lower 8 bits of the data argument appended to it. + +```python +Stream'.buffer = Stream.buffer + [data & 0xff] +``` + +There may be an explicit limit on the size of cached data, or an implicit +limit based by the available capacity of the destination filesystem. +When a limit is reached, `write()` SHOULD fail with an `IOException`. + +### `write(Stream, byte[] data, int offset, int len)` + + +#### Preconditions + +The preconditions are all defined in `OutputStream.write()` + +```python +Stream.open else raise ClosedChannelException, PathIOException, IOException +data != null else raise NullPointerException +offset >= 0 else raise IndexOutOfBoundsException +len >= 0 else raise IndexOutOfBoundsException +offset < data.length else raise IndexOutOfBoundsException +offset + len < data.length else raise IndexOutOfBoundsException +``` + +There may be an explicit limit on the size of cached data, or an implicit +limit based by the available capacity of the destination filesystem. +When a limit is reached, `write()` SHOULD fail with an `IOException`. + +After the operation has returned, the buffer may be re-used. The outcome +of updates to the buffer while the `write()` operation is in progress is undefined. + +#### Postconditions + +```python +Stream'.buffer = Stream.buffer + data[offset...(offset + len)] +``` + +### `write(byte[] data)` + +This is defined as the equivalent of: + +```python +write(data, 0, data.length) +``` + +### `flush()` + +Requests that the data is flushed. The specification of `ObjectStream.flush()` +declares that this SHOULD write data to the "intended destination". + +It explicitly precludes any guarantees about durability. + +For that reason, this document doesn't provide any normative +specifications of behaviour. + +#### Preconditions + +```python +Stream.open else raise IOException +``` + +#### Postconditions + +None. + +If the implementation chooses to implement a stream-flushing operation, +the data may be saved to the file system such that it becomes visible to +others" + +```python +FS' = FS where data(FS', path) == buffer +``` + +Some applications have been known to call `flush()` on a closed stream +on the assumption that it is harmless. Implementations MAY choose to +support this behaviour. + +### `close()` + +The `close()` operation saves all data to the filesystem and +releases any resources used for writing data. + +The `close()` call is expected to block +until the write has completed (as with `Syncable.hflush()`), possibly +until it has been written to durable storage. + +After `close()` completes, the data in a file MUST be visible and consistent +with the data most recently written. The metadata of the file MUST be consistent +with the data and the write history itself (i.e. any modification time fields +updated). + +After `close()` is invoked, all subsequent `write()` calls on the stream +MUST fail with an `IOException`. + + +Any locking/leaseholding mechanism is also required to release its lock/lease. + +```python +Stream'.open = false +FS' = FS where data(FS', path) == buffer +``` + +The `close()` call MAY fail during its operation. + +1. Callers of the API MUST expect for some calls to fail and SHOULD code appropriately. +Catching and swallowing exceptions, while common, is not always the ideal solution. +1. Even after a failure, `close()` MUST place the stream into a closed state. +Follow-on calls to `close()` are ignored, and calls to other methods +rejected. That is: caller's cannot be expected to call `close()` repeatedly +until it succeeds. +1. The duration of the `call()` operation is undefined. Operations which rely +on acknowledgements from remote systems to meet the persistence guarantees +implicitly have to await these acknowledgements. Some Object Store output streams +upload the entire data file in the `close()` operation. This can take a large amount +of time. The fact that many user applications assume that `close()` is both fast +and does not fail means that this behavior is dangerous. + +Recommendations for safe use by callers + +* Do plan for exceptions being raised, either in catching and logging or +by throwing the exception further up. Catching and silently swallowing exceptions +may hide serious problems. +* Heartbeat operations SHOULD take place on a separate thread, so that a long +delay in `close()` does not block the thread so long that the heartbeat times +out. + +### HDFS and `OutputStream.close()` + +HDFS does not immediately `sync()` the output of a written file to disk on +`OutputStream.close()` unless configured with `dfs.datanode.synconclose` +is true. This has caused [problems in some applications](https://issues.apache.org/jira/browse/ACCUMULO-1364). + +Applications which absolutely require the guarantee that a file has been persisted +MUST call `Syncable.hsync()` before the file is closed. + + +## `org.apache.hadoop.fs.Syncable` + +```java +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface Syncable { + + + /** Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * @throws IOException if any error occurs + */ + void hflush() throws IOException; + + /** Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * @throws IOException if error occurs + */ + void hsync() throws IOException; +} +``` + +The purpose of `Syncable` interface is to provide guarantees that data is written +to a filesystem for both visibility and durability. + +*SYNC-1*: An `OutputStream` which implements `Syncable` is +making an explicit declaration that it can meet those guarantees. + +*SYNC-2*: The interface MUST NOT be declared as implemented by an `OutputStream` unless +those guarantees can be met. + +The `Syncable` interface has been implemented by other classes than +subclasses of `OutputStream`, such as `org.apache.hadoop.io.SequenceFile.Writer`. + +*SYNC-3* The fact that a class implements `Syncable` does not guarantee +that `extends OutputStream` holds. + +That is, for any class `C`: `(C instanceof Syncable)` does not imply +`(C instanceof OutputStream)` + +This specification only covers the required behavior of `OutputStream` subclasses +which implement `Syncable`. + + +*SYNC-4:* The return value of `FileSystem.create(Path)` is an instance +of `FSDataOutputStream`. + +*SYNC-5:* `FSDataOutputStream implements Syncable` + + +SYNC-5 and SYNC-1 imply that all output streams which can be created +with `FileSystem.create(Path)` must support the semantics of `Syncable`. +This is demonstrably not true: `FSDataOutputStream` simply downgrades +to a `flush()` if its wrapped stream is not `Syncable`. +Therefore the declarations SYNC-1 and SYNC-2 do not hold: you cannot trust `Syncable`. + +Put differently: *callers MUST NOT rely on the presence of the interface +as evidence that the semantics of `Syncable` are supported*. Instead +they MUST be dynamically probed for using the `StreamCapabilities` +interface, where available. + + +### `Syncable.hflush()` + +Flush out the data in client's user buffer. After the return of +this call, new readers will see the data. The `hflush()` operation +does not contain any guarantees as to the durability of the data. only +its visibility. + +Thus implementations may cache the written data in memory +—visible to all, but not yet persisted. + +#### Preconditions + +```python +hasCapability(Stream, "hflush") +Stream.open else raise IOException +``` + + +#### Postconditions + +```python +FS' = FS where data(path) == cache +``` + + +After the call returns, the data MUST be visible to all new callers +of `FileSystem.open(path)` and `FileSystem.openFile(path).build()`. + +There is no requirement or guarantee that clients with an existing +`DataInputStream` created by a call to `(FS, path)` will see the updated +data, nor is there a guarantee that they *will not* in a current or subsequent +read. + +Implementation note: as a correct `hsync()` implementation must also +offer all the semantics of an `hflush()` call, implementations of `hflush()` +may just invoke `hsync()`: + +```java +public void hflush() throws IOException { + hsync(); +} +``` + +### `Syncable.hsync()` + +Similar to POSIX `fsync()`, this call saves the data in client's user buffer +all the way to the disk device (but the disk may have it in its cache). + +That is: it is a requirement for the underlying FS To save all the data to +the disk hardware itself, where it is expected to be durable. + +#### Preconditions + +```python +hasCapability(Stream, "hsync") +Stream.open else raise IOException +``` + +#### Postconditions + +```python +FS' = FS where data(path) == buffer +``` + +The reference implementation, `DFSOutputStream` will block +until an acknowledgement is received from the datanodes: That is, all hosts +in the replica write chain have successfully written the file. + +That means that the expectation callers may have is that the return of +the method call contains visibility and durability guarantees which other +implementations must maintain. + +Note, however, that the reference `DFSOutputStream.hsync()` call only actually syncs/ +*the current block*. If there have been a series of writes since the last sync, +such that a block boundary has been crossed. The `hsync()` call claims only +to write the most recent. + + +From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)` + +> Note that only the current block is flushed to the disk device. +> To guarantee durable sync across block boundaries the stream should +> be created with {@link CreateFlag#SYNC_BLOCK}. + + +In virtual machines, the notion of "disk hardware" is really that of +another software abstraction: there are guarantees. + + +## Interface `StreamCapabilities` + +```java +@InterfaceAudience.Public +@InterfaceStability.Evolving +``` + +The `StreamCapabilities` interface exists to allow callers to dynamically +determine the behavior of a stream. + + +The reference implementation of this interface is + `org.apache.hadoop.hdfs.DFSOutputStream` + +```java + public boolean hasCapability(String capability) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return true; + default: + return false; + } + } +``` + +Where `HSYNC` and `HFLUSH` are items in the enumeration +`org.apache.hadoop.fs.StreamCapabilities.StreamCapability`. + +## interface `CanSetDropBehind` + +```java +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface CanSetDropBehind { + /** + * Configure whether the stream should drop the cache. + * + * @param dropCache Whether to drop the cache. null means to use the + * default value. + * @throws IOException If there was an error changing the dropBehind + * setting. + * UnsupportedOperationException If this stream doesn't support + * setting the drop-behind. + */ + void setDropBehind(Boolean dropCache) + throws IOException, UnsupportedOperationException; +} +``` + +This interface allows callers to change policies used inside HDFS. +They are currently unimplemented by any stream other than those in HDFS. + + + +## Durability, Concurrency, Consistency and Visibility of stream output. + +These are the aspects of the system behaviour which are not directly +covered in this (very simplistic) filesystem model, but which are visible +in production. + + +### Durability + +1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously +1. `OutputStream.flush()` flushes data to the destination. There +are no strict persistence requirements. +1. `Syncable.hflush()` synchronously sends all local data to the destination +filesystem. After returning to the caller, the data MUST be visible to other readers, +it MAY be durable. That is: it does not have to be persisted, merely guaranteed +to be consistently visible to all clients attempting to open a new stream reading +data at the path. +1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable +storage. +1. `close()` The first call to `close()` MUST flush out all remaining data in +the buffers, and persist it. + + +Many applications call `flush()` far too often -such as at the end of every line written. +If this triggered an update of the data in persistent storage and any accompanying +metadata, distributed stores would overload fast. +Thus: `flush()` is often treated at most as a cue to flush data to the network +buffers -but not commit to writing any data. +It is only the `Syncable` interface which offers guarantees. + +### Concurrency + +1. The outcome of more than one process writing to the same file is undefined. + +1. An input stream opened to read a file *before the file was opened for writing* +MAY fetch data updated by writes to an OutputStream. +Because of buffering and caching, this is not a requirement +—and if an input stream does pick up updated data, the point at +which the updated data is read is undefined. This surfaces in object stores +where a `seek()` call which closes and re-opens the connection may pick up +updated data, while forward stream reads do not. Similarly, in block-oriented +filesystems, the data may be cached a block at a time —and changes only picked +up when a different block is read. + +1. A filesystem MAY allow the destination path to be manipulated while a stream +is writing to it —for example, `rename()` of the path or a parent; `delete()` of +a path or parent. In such a case, the outcome of future write operations on +the output stream is undefined. Some filesystems MAY implement locking to +prevent conflict. However, this tends to be rare on distributed filesystems, +for reasons well known in the literature. + +1. The Java API specification of `java.io.OutputStream` does not require +an instance of the class to be thread safe. +However, `org.apache.hadoop.hdfs.DFSOutputStream` +has a stronger thread safety model (possibly unintentionally). This fact is +relied upon in Apache HBase, as discovered in HADOOP-11708. Implementations +SHOULD be thread safe. *Note*: even the `DFSOutputStream` synchronization +model permits the output stream to have `close()` invoked while awaiting an +acknowledgement from datanode or namenode writes in an `hsync()` operation. + +### Consistency and Visibility + +There is no requirement for the data to be immediately visible to other applications +—not until a specific call to flush buffers or persist it to the underlying storage +medium are made. + +If an output stream is created with `FileSystem.create(path, overwrite==true)` +and there is an existing file at the path, that is `exists(FS, path)` holds, +then, the existing data is immediately unavailable; the data at the end of the +path MUST consist of an empty byte sequence `[]`, with consistent metadata. + + +```python +exists(FS, path) +(Stream', FS') = create(FS, path) +exists(FS', path) +getFileStatus(FS', path).getLen() = 0 +``` + +The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent +with the contents of the file after `flush()` and `sync()`. + +```python +(Stream', FS') = create(FS, path) +(Stream'', FS'') = write(Stream', data) +(Stream''', FS''') hsync(Stream'') +exists(FS''', path) +getFileStatus(FS''', path).getLen() = len(data) +``` + +HDFS does not do this except when the write crosses a block boundary; to do +otherwise would overload the Namenode. Other stores MAY copy this behavior. + +As a result, while a file is being written +`length(Filesystem, Path)` MAY be less than the length of `data(Filesystem, Path)`. + +The metadata MUST be consistent with the contents of a file after the `close()` +operation. + +After the contents of an output stream have been persisted (`hflush()/hsync()`) +all new `open(FS, Path)` operations MUST return the updated data. + +After `close()` has been invoked on an output stream, +a call to `getFileStatus(path)` MUST return the final metadata of the written file, +including length and modification time. +The metadata of the file returned in any of the FileSystem `list` operations +MUST be consistent with this metadata. + +The value of `getFileStatus(path).getModificationTime()` is not defined +while a stream is being written to. +The timestamp MAY be updated while a file is being written, +especially after a `Syncable.hsync()` call. +The timestamps MUST be updated after the file is closed +to that of a clock value observed by the server during the `close()` call. +It is *likely* to be in the time and time zone of the filesystem, rather +than that of the client. + +Formally, if a `close()` operation triggers an interaction with a server +which starts at server-side time `t1` and completes at time `t2` with a successfully +written file, then the last modification time SHOULD be a time `t` where +`t1 <= t <= t2` + +## Issues with the Hadoop Output Stream model. + +There are some known issues with the output stream model as offered by Hadoop, +specifically about the guarantees about when data is written and persisted +—and when the metadata is synchronized. +These are where implementation aspects of HDFS and the "Local" filesystem +do not follow the simple model of the filesystem used in this specification. + +### HDFS + +That HDFS file metadata often lags the content of a file being written +to is not something everyone expects, nor convenient for any program trying +pick up updated data in a file being written. Most visible is the length +of a file returned in the various `list` commands and `getFileStatus` —this +is often out of data. + +As HDFS only supports file growth in its output operations, this means +that the size of the file as listed in the metadata may be less than or equal +to the number of available bytes —but never larger. This is a guarantee which +is also held + +One algorithm to determine whether a file in HDFS is updated is: + +1. Remember the last read position `pos` in the file, using `0` if this is the initial +read. +1. Use `getFileStatus(FS, Path)` to query the updated length of the file as +recorded in the metadata. +1. If `Status.length > pos`, the file has grown. +1. If the number has not changed, then + 1. reopen the file. + 1. `seek(pos)` to that location + 1. If `read() != -1`, there is new data. + +This algorithm works for filesystems which are consistent with metadata and +data, as well as HDFS. What is important to know is that, for an open file +`getFileStatus(FS, path).getLen() == 0` does not imply that `data(FS, path)` is +empty. + +When an output stream in HDFS is closed; the newly written data is not immediately +written to disk unless HDFS is deployed with `dfs.datanode.synconclose` set to +true. Otherwise it is cached and written to disk later. + + +### Local Filesystem, `file:` + +`LocalFileSystem`, `file:`, (or any other `FileSystem` implementation based on +`ChecksumFileSystem`) has a different issue. If an output stream +is obtained from `create()` and `FileSystem.setWriteChecksum(false)` has +*not* been called on the filesystem, then the FS only flushes as much +local data as can be written to full checksummed blocks of data. + +That is, the flush operations are not guaranteed to write all the pending +data until the file is finally closed. + +That is, `sync()`, `hsync()` and `hflush()` may not persist all data written +to the stream. + +For anyone thinking "this is a violation of this specification" —they are correct. +The local filesystem was intended for testing, rather than production use. + +### Checksummed output streams + +Because `org.apache.hadoop.fs.FSOutputSummer` and +`org.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer` +implement the underlying checksummed output stream used by HDFS and +other filesystems, it provides some of the core semantics of the output +stream behavior. + +1. The `close()` call is unsynchronized, re-entrant and may attempt +to close the stream more than once. +1. It is possible to call `write(int)` on a closed stream (but not +`write(byte[], int, int)`). +1. It is possible to call `flush()` on a closed stream. + +Behaviors 1 and 2 really have to be considered bugs to fix, albeit with care. + + +### Object Stores + +Object store streams MAY buffer the entire stream's output +until the final `close()` operation triggers a single `PUT` of the data +and materialization of the final output. + +This significantly change's their behaviour compared to that of +POSIX filesystems and that specified in this document. + +#### Visibility of newly created files + +There is no guarantee that any file will be visible at the path of an output +stream after the output stream is created . + +That is: while `create(FS, path, boolean)` returns a new stream + +```python +Stream' = (path, true, []) +``` + +The other postcondition of the operation, `data(FS', path) == []` MAY NOT +hold, in which case: + +1. `exists(FS, p)` MAY return false. +1. If a file was created with `overwrite = True`, the existing data MAY still +be visible: `data(FS', path) = data(FS, path)`. + +1. The check for existing data in a `create()` call with `overwrite=False`, may +take place in the `create()` call itself, in the `close()` call prior to/during +the write, or at some point in between. Expect in the special case that the +object store supports an atomic `PUT` operation, the check for existence of +existing data and the subsequent creation of data at the path contains a race +condition: other clients may create data at the path between the existence check +and the subsequent write. + +1. Calls to `create(FS, Path, overwrite=false)` MAY succeed, returning a new +`OutputStream`, even while another stream is open and writing to the destination +path. + +This allows for the following sequence of operations, which would +raise an exception in the second `open()` call if invoked against HDFS: + +```python +Stream1 = open(FS, path, false) +sleep(200) +Stream2 = open(FS, path, false) +Stream.write('a') +Stream1.close() +Stream2.close() +``` + +For anyone wondering why the clients don't create a 0-byte file in the `create()` call, +it would cause problems after `close()` —the marker file could get +returned in `open()` calls instead of the final data. + +#### Visibility of the output of a stream after `close()` + +One guarantee which Object Stores SHOULD make is the same as those of POSIX +filesystems: After a stream `close()` call returns, the data MUST be persisted +durably and visible to all callers. Unfortunately, even that guarantee is +not always met: + +1. Existing data on a path MAY be visible for an indeterminate period of time. + +1. If the store has any form of create inconsistency or buffering of negative +existence probes, then even after the stream's `close()` operation has returned, +`getFileStatus(FS, path)` and `open(FS, path)` may fail with a `FileNotFoundException`. + +In their favour, the atomicity of the store's PUT operations do offer their +own guarantee: a newly created object is either absent or all of its data +is present: the act of instantiating the object, while potentially exhibiting +create inconsistency, is atomic. Applications may be able to use that fact +to their advantage. + + +## Implementors notes. + +### `StreamCapabilities` + +Implementors of filesystem clients SHOULD implement the `StreamCapabilities` +interface and its `hasCapabilities()` method to to declare whether or not +an output streams offer the visibility and durability guarantees of `Syncable`. + +Implementors of `StreamCapabilities.hasCapabilities()` MUST NOT declare that +they support the `hflush` and `hsync` capabilities on streams where this is not true. + +Sometimes streams pass their data to store, but the far end may not +sync it all the way to disk. That is not something the client can determine. +Here: if the client code is making the hflush/hsync calls to the distributed FS, +it SHOULD declare that it supports them. + +### Metadata updates + +Implementors MAY NOT update a file's metadata (length, date, ...) after +every `hsync()` call. HDFS doesn't -so there is no need to feel too guilty. + +### Does `close()` sync data? + +By default, HDFS does not sync data to disk when a stream is closed. +This does not mean that users do not expect it. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java index 79222ce67d6cf..95814b8a15540 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java @@ -24,8 +24,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; + import org.junit.Test; import org.junit.AssumptionViolatedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -42,6 +46,9 @@ public abstract class AbstractContractCreateTest extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractCreateTest.class); + /** * How long to wait for a path to become visible. */ @@ -436,4 +443,78 @@ private void createFile(Path path) throws IOException { writeDataset(fs, path, data, data.length, 1024 * 1024, true); } + + @Test + public void testHSync() throws Throwable { + describe("test declared and actual Syncable behaviors"); + Path path = methodPath(); + FileSystem fs = getFileSystem(); + boolean supportsFlush = isSupported(SUPPORTS_HFLUSH); + boolean supportsSync = isSupported(SUPPORTS_HSYNC); + + try (FSDataOutputStream out = fs.create(path, true)) { + + boolean doesHFlush = out.hasCapability( + StreamCapabilities.StreamCapability.HFLUSH.getValue()); + if (doesHFlush) { + out.hflush(); + } + assertEquals("hflush support", supportsFlush, doesHFlush); + boolean doesHSync = out.hasCapability( + StreamCapabilities.StreamCapability.HSYNC.getValue()); + assertEquals("hsync support", supportsSync, doesHSync); + + try { + out.hflush(); + if (!supportsFlush) { + // hsync not ignored + LOG.warn("FS doesn't support Syncable.hflush()," + + " but doesn't reject it either."); + } + } catch (UnsupportedOperationException e) { + if (supportsFlush) { + throw new AssertionError("hflush not supported", e); + } + } + out.write('a'); + try { + out.hsync(); + } catch (UnsupportedOperationException e) { + if (supportsSync) { + throw new AssertionError("HSync not supported", e); + } + } + + if (supportsSync) { + // if sync really worked, data is visible here + + try(FSDataInputStream in = fs.open(path)) { + assertEquals('a', in.read()); + assertEquals(-1, in.read()); + } + } else { + out.flush(); + // no sync. let's see what's there + try (FSDataInputStream in = fs.open(path)) { + + int c = in.read(); + if (c == -1) { + // nothing was synced; sync and flush really aren't there. + } else { + LOG.info("sync and flush are declared unsupported" + + " - but the stream does offer some sync/flush semantics"); + } + } catch (FileNotFoundException e) { + // that's OK if it's an object store, but not if its a real + // FS + if (!isSupported(IS_BLOBSTORE)) { + throw e; + } + } + + } + + } + + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index ac9de6d7bfe8c..3c285650e9936 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -295,7 +295,7 @@ protected void handleRelaxedException(String action, if (getContract().isSupported(SUPPORTS_STRICT_EXCEPTIONS, false)) { throw e; } - LOG.warn("The expected exception {} was not the exception class" + + LOG.warn("The expected exception {} was not the exception class" + " raised on {}: {}", action , e.getClass(), expectedException, e); } @@ -398,4 +398,13 @@ protected String generateAndLogErrorListing(Path src, Path dst) throws } return destDirLS; } + + /** + * Get a path from the name of the current test case. + * @return a path based on the test method being executed. + * @throws IOException IO problem + */ + protected Path methodPath() throws IOException { + return path(methodName.getMethodName()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java index 3f31c07742c59..286fcec4a1c8e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java @@ -241,4 +241,13 @@ public interface ContractOptions { */ String TEST_RANDOM_SEEK_COUNT = "test.random-seek-count"; + /** + * Is hflush supported in API and StreamCapabilities? + */ + String SUPPORTS_HFLUSH = "supports-hflush"; + + /** + * Is hsync supported in API and StreamCapabilities? + */ + String SUPPORTS_HSYNC = "supports-hsync"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml index 3c9fcccc73846..d7e039647d5fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml @@ -116,4 +116,14 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java index 2b89fb0a73242..f8bb8e33b1403 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import java.io.IOException; @@ -42,7 +44,8 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public final class AdlFsOutputStream extends OutputStream implements Syncable { +public final class AdlFsOutputStream extends OutputStream + implements Syncable, StreamCapabilities { private final ADLFileOutputStream out; public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration) @@ -79,4 +82,9 @@ public synchronized void hflush() throws IOException { public synchronized void hsync() throws IOException { out.flush(); } + + @Override + public boolean hasCapability(String capability) { + return StoreImplementationUtils.supportsSyncable(capability); + } } diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml index 5bbdd6fbb8645..e45a662229377 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml +++ b/hadoop-tools/hadoop-azure-datalake/src/test/resources/adls.xml @@ -148,4 +148,14 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 8ba2223077ad1..48ef495d7b7ef 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.azure.security.Constants; import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager; import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -1052,10 +1053,7 @@ public void hsync() throws IOException { */ @Override // StreamCapability public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index 14ddb02fc4a6b..d64b1d5984ce4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; /** * Support the Syncable interface on top of a DataOutputStream. @@ -56,10 +58,7 @@ public OutputStream getOutStream() { @Override public boolean hasCapability(String capability) { - if (out instanceof StreamCapabilities) { - return ((StreamCapabilities) out).hasCapability(capability); - } - return false; + return StoreImplementationUtils.hasCapability(out, capability); } @Override From 7452ea18812997341c976ad63adcbf8b95fd15d3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 5 Nov 2019 17:22:25 +0000 Subject: [PATCH 2/8] HADOOP-13327 review usages of "must" in lower case Change-Id: Id38cf27639215abdd0d8c3578ddf72ed7adca8c5 --- .../src/site/markdown/filesystem/outputstream.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index f3fd8764f466f..6aa4b8d85a025 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -152,7 +152,7 @@ than `close()`; they MAY fail with `IOException` or `RuntimeException`. Stream' = (path, false, []) ``` -The `close()` operation must be idempotent with the sole attempt to write the +The `close()` operation MUST be idempotent with the sole attempt to write the data made in the first invocation. 1. If `close()` succeeds, subsequent calls are no-ops. @@ -197,8 +197,8 @@ has made such a call on this in the past As the HDFS implementation is considered the de-facto specification of the FileSystem APIs, the fact that `write()` is thread-safe is significant. -For compatibility, not only must other FS clients be thread-safe, -but new HDFS featues, such as encryption and Erasure Coding must also +For compatibility, not only SHOULD other FS clients be thread-safe, +but new HDFS featues, such as encryption and Erasure Coding SHOULD also implement consistent behavior with the core HDFS output stream. Put differently: @@ -477,7 +477,7 @@ There is no requirement or guarantee that clients with an existing data, nor is there a guarantee that they *will not* in a current or subsequent read. -Implementation note: as a correct `hsync()` implementation must also +Implementation note: as a correct `hsync()` implementation MUST also offer all the semantics of an `hflush()` call, implementations of `hflush()` may just invoke `hsync()`: @@ -796,7 +796,7 @@ and materialization of the final output. This significantly change's their behaviour compared to that of POSIX filesystems and that specified in this document. -#### Visibility of newly created files +#### Visibility of newly created objects There is no guarantee that any file will be visible at the path of an output stream after the output stream is created . From f89118d7b0bcb287868d5f7440c080ee450d1e2a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 5 Nov 2019 17:36:18 +0000 Subject: [PATCH 3/8] HADOOP-13327 review of more of Sean Busbey's comments TODO "Could this be in a section about visibility and not in the model definition? Maybe later. "here's the model, here's how that model works with creation, here's how it works when reading/writing" flows much better and visibility should be in that third part." Change-Id: I61c89475a1ea72006524803f2a7dd9e40551d718 --- .../src/site/markdown/filesystem/filesystem.md | 8 ++++++++ .../src/site/markdown/filesystem/outputstream.md | 14 +++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 7d07b19037d28..99912b68a3a32 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -641,6 +641,8 @@ The updated (valid) FileSystem must contains all the parent directories of the p The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of `FS.Files[p]` +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes * Some implementations split the create into a check for the file existing @@ -672,6 +674,8 @@ this precondition fails. Make a `FSDataOutputStreamBuilder` to specify the parameters to create a file. +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes `createFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make @@ -703,11 +707,15 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]` by appending data to the existing list. +The behavior of the returned stream is covered in [Output](outputstream.html). + ### `FSDataOutputStreamBuilder appendFile(Path p)` Make a `FSDataOutputStreamBuilder` to specify the parameters to append to an existing file. +The behavior of the returned stream is covered in [Output](outputstream.html). + #### Implementation Notes `appendFile(p)` returns a `FSDataOutputStreamBuilder` only and does not make diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 6aa4b8d85a025..fc32941bef5e9 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -97,13 +97,13 @@ FS'.Files(path) == buffer ``` Any client reading the data at the path MUST see the new data. -The two sync operations, `hflush()` and `hsync()` differ in their durability +The `Syncable` operations differ in their durability guarantees, not visibility of data. -### State of Stream and Filesystem after `Filesystem.create()` +### State of Stream and File System after `Filesystem.create()` The output stream returned by a `FileSystem.create(path)` or -`FileSystem.createFile(path).build` +`FileSystem.createFile(path).build()` within a filesystem `FS`, can be modeled as a triple containing an empty array of no data: ```python @@ -113,7 +113,7 @@ Stream' = (path, true, []) The filesystem `FS'` MUST contain a 0-byte file at the path: ```python -data(FS', path) == [] +FS' = FS where data(FS', path) == [] ``` Thus, the initial state of `Stream'.buffer` is implicitly @@ -122,10 +122,10 @@ consistent with the data at the filesystem. *Object Stores*: see caveats in the "Object Stores" section below. -### State of Stream and Filesystem after `Filesystem.append()` +### State of Stream and File System after `Filesystem.append()` The output stream returned from a call of - `FileSystem.append(path, buffersize, progress)`, + `FileSystem.append(path, buffersize, progress)` within a filesystem `FS`, can be modelled as a stream whose `buffer` is intialized to that of the original file: @@ -149,7 +149,7 @@ After a call to `close()`, the stream is closed for all operations other than `close()`; they MAY fail with `IOException` or `RuntimeException`. ```python -Stream' = (path, false, []) +Stream' = (path, false, []) ``` The `close()` operation MUST be idempotent with the sole attempt to write the From af423479f26265de6d74ff2204de2fcdfb8d5db9 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 27 Jan 2020 12:14:16 +0000 Subject: [PATCH 4/8] HADOOP-13327 output stream spec. Review with more on 404 caching. Change-Id: Ib474a84e48556c6b76121427a026fa854b5bd9e0 --- .../fs/impl/StoreImplementationUtils.java | 14 +-- .../site/markdown/filesystem/filesystem.md | 2 +- .../site/markdown/filesystem/outputstream.md | 86 +++++++++++++------ 3 files changed, 69 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java index 68c53b1a1cc77..da47ad326b84c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java @@ -13,7 +13,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License. + * limitations under the License */ package org.apache.hadoop.fs.impl; @@ -51,14 +51,14 @@ public static boolean supportsSyncable(String capability) { /** * Probe for an object having a capability; returns true - * iff the stream implements {@link StreamCapabilities} and its + * if the stream implements {@link StreamCapabilities} and its * {@code hasCapabilities()} method returns true for the capability. * This is a package private method intended to provided a common * implementation for input and output streams. * {@link StreamCapabilities#hasCapability(String)} call is for public use. * @param object object to probe. * @param capability capability to probe for - * @return true iff the object implements stream capabilities and + * @return true if the object implements stream capabilities and * declares that it supports the capability. */ static boolean objectHasCapability(Object object, String capability) { @@ -70,11 +70,11 @@ static boolean objectHasCapability(Object object, String capability) { /** * Probe for an output stream having a capability; returns true - * iff the stream implements {@link StreamCapabilities} and its + * if the stream implements {@link StreamCapabilities} and its * {@code hasCapabilities()} method returns true for the capability. * @param out output stream * @param capability capability to probe for - * @return true iff the stream declares that it supports the capability. + * @return true if the stream declares that it supports the capability. */ public static boolean hasCapability(OutputStream out, String capability) { return objectHasCapability(out, capability); @@ -82,11 +82,11 @@ public static boolean hasCapability(OutputStream out, String capability) { /** * Probe for an input stream having a capability; returns true - * iff the stream implements {@link StreamCapabilities} and its + * if the stream implements {@link StreamCapabilities} and its * {@code hasCapabilities()} method returns true for the capability. * @param out output stream * @param capability capability to probe for - * @return true iff the stream declares that it supports the capability. + * @return true if the stream declares that it supports the capability. */ public static boolean hasCapability(InputStream out, String capability) { return objectHasCapability(out, capability); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 99912b68a3a32..da6f890310831 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -634,7 +634,7 @@ For instance, HDFS may raise an `InvalidPathException`. result = FSDataOutputStream -A zero byte file must exist at the end of the specified path, visible to all +A zero byte file must exist at the end of the specified path, visible to all. The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index fc32941bef5e9..4cc5c3c19f898 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -24,7 +24,7 @@ This document covers the Output Streams within the context of the It uses the filesystem model defined in [A Model of a Hadoop Filesystem](model.html) with the notation defined in [notation](Notation.md). -The target audiences are +The target audiences are: 1. Users of the APIs. While `java.io.OutputStream` is a standard interfaces, this document clarifies how it is implemented in HDFS and elsewhere. The Hadoop-specific interfaces `Syncable` and `StreamCapabilities` are new; @@ -60,8 +60,6 @@ A new interface: `StreamCapabilities`. This allows callers to probe the exact capabilities of a stream, even transitively through a chain of streams. - - ## Output Stream Model For this specification, an output stream can be viewed as a list of bytes @@ -303,9 +301,7 @@ specifications of behaviour. #### Preconditions -```python -Stream.open else raise IOException -``` +None. #### Postconditions @@ -319,9 +315,23 @@ others" FS' = FS where data(FS', path) == buffer ``` -Some applications have been known to call `flush()` on a closed stream -on the assumption that it is harmless. Implementations MAY choose to -support this behaviour. +When a stream is closed, `flush()` SHOULD downgrade to being a no-op, if it was not +one already. This is to work with applications and libraries which can invoke +it in exactly this way. + + +*Issue*: Should `flush()` forward to `hflush()`? + +No. Or at least, make it optional. + +There's a lot of application code which assumes that `flush()` is low cost +and should be invoked after writing every single line of output, after +writing small 4KB blocks or similar. + +Forwarding this to a full flush across a distributed filesystem, or worse, +a distant object store, is very underperformant + +See [HADOOP-16548](https://issues.apache.org/jira/browse/HADOOP-16548) ### `close()` @@ -372,6 +382,9 @@ may hide serious problems. delay in `close()` does not block the thread so long that the heartbeat times out. +And for implementors: have a look at [HADOOP-16785](https://issues.apache.org/jira/browse/HADOOP-16785) +to see examples of complications here. + ### HDFS and `OutputStream.close()` HDFS does not immediately `sync()` the output of a written file to disk on @@ -379,7 +392,7 @@ HDFS does not immediately `sync()` the output of a written file to disk on is true. This has caused [problems in some applications](https://issues.apache.org/jira/browse/ACCUMULO-1364). Applications which absolutely require the guarantee that a file has been persisted -MUST call `Syncable.hsync()` before the file is closed. +MUST call `Syncable.hsync()` *before* the file is closed. ## `org.apache.hadoop.fs.Syncable` @@ -530,7 +543,7 @@ From the javadocs of `DFSOutputStream.hsync(EnumSet syncFlags)` In virtual machines, the notion of "disk hardware" is really that of -another software abstraction: there are guarantees. +another software abstraction: there are few guarantees. ## Interface `StreamCapabilities` @@ -543,7 +556,6 @@ another software abstraction: there are guarantees. The `StreamCapabilities` interface exists to allow callers to dynamically determine the behavior of a stream. - The reference implementation of this interface is `org.apache.hadoop.hdfs.DFSOutputStream` @@ -562,7 +574,14 @@ The reference implementation of this interface is Where `HSYNC` and `HFLUSH` are items in the enumeration `org.apache.hadoop.fs.StreamCapabilities.StreamCapability`. -## interface `CanSetDropBehind` +Once a stream has been closed, th `hasCapability()` call MUST do one of + +* return the capabilities of the open stream. +* return false. + +That is: it MUST NOT raise an exception about the file being closed; + +## interface `CanSetDropBehind` ```java @InterfaceAudience.Public @@ -595,7 +614,7 @@ covered in this (very simplistic) filesystem model, but which are visible in production. -### Durability +### Durability 1. `OutputStream.write()` MAY persist the data, synchronously or asynchronously 1. `OutputStream.flush()` flushes data to the destination. There @@ -618,7 +637,7 @@ Thus: `flush()` is often treated at most as a cue to flush data to the network buffers -but not commit to writing any data. It is only the `Syncable` interface which offers guarantees. -### Concurrency +### Concurrency 1. The outcome of more than one process writing to the same file is undefined. @@ -678,7 +697,7 @@ exists(FS''', path) getFileStatus(FS''', path).getLen() = len(data) ``` -HDFS does not do this except when the write crosses a block boundary; to do +*HDFS does not do this except when the write crosses a block boundary*; to do otherwise would overload the Namenode. Other stores MAY copy this behavior. As a result, while a file is being written @@ -710,7 +729,7 @@ which starts at server-side time `t1` and completes at time `t2` with a successf written file, then the last modification time SHOULD be a time `t` where `t1 <= t <= t2` -## Issues with the Hadoop Output Stream model. +## Issues with the Hadoop Output Stream model. There are some known issues with the output stream model as offered by Hadoop, specifically about the guarantees about when data is written and persisted @@ -718,7 +737,7 @@ specifically about the guarantees about when data is written and persisted These are where implementation aspects of HDFS and the "Local" filesystem do not follow the simple model of the filesystem used in this specification. -### HDFS +### HDFS That HDFS file metadata often lags the content of a file being written to is not something everyone expects, nor convenient for any program trying @@ -751,7 +770,6 @@ empty. When an output stream in HDFS is closed; the newly written data is not immediately written to disk unless HDFS is deployed with `dfs.datanode.synconclose` set to true. Otherwise it is cached and written to disk later. - ### Local Filesystem, `file:` @@ -770,7 +788,7 @@ to the stream. For anyone thinking "this is a violation of this specification" —they are correct. The local filesystem was intended for testing, rather than production use. -### Checksummed output streams +### Checksummed output streams Because `org.apache.hadoop.fs.FSOutputSummer` and `org.apache.hadoop.fs.ChecksumFileSystem.ChecksumFSOutputSummer` @@ -787,7 +805,7 @@ to close the stream more than once. Behaviors 1 and 2 really have to be considered bugs to fix, albeit with care. -### Object Stores +### Object Stores Object store streams MAY buffer the entire stream's output until the final `close()` operation triggers a single `PUT` of the data @@ -861,8 +879,26 @@ is present: the act of instantiating the object, while potentially exhibiting create inconsistency, is atomic. Applications may be able to use that fact to their advantage. - -## Implementors notes. +There is a special troublespot in AWS S3 where it caches 404 responses returned +by the service from probes for an object existing _before the file has been created_. +A 404 record can remain in the load balancer's cache for some time -it seems to expire +only after a "sufficient" interval of no probes for that path. +This has been difficult to deal with within the Hadoop S3A code itself +(HADOOP-16490, HADOOP-16635) -and if applications make their own probes for files +before creating them, the problem will intermittently surface. + +1. If you look for an object on S3 and it is not there - The 404 MAY Be returned even +after the object has been created. +1. FS operations triggering such a probe include: `getFileStatus()`, `exists()`, `open()` +and others. +1. The S3A connector does not do a probe if a file is created through `create()` overwrite=true; +it only makes sure that the path does not reference a directory. Applications SHOULD always +create files with this option except when some form of exclusivity is needed on file +creation -in which case, be aware, that with the non-atomic probe+create sequence which +some object store connectors implement, the semantics of the creation are not sufficient +to allow the filesystem to be used as an implicit coordination mechanism between processes. +`` +## Implementors notes. ### `StreamCapabilities` @@ -875,8 +911,8 @@ they support the `hflush` and `hsync` capabilities on streams where this is not Sometimes streams pass their data to store, but the far end may not sync it all the way to disk. That is not something the client can determine. -Here: if the client code is making the hflush/hsync calls to the distributed FS, -it SHOULD declare that it supports them. +Here: if the client code is making the hflush/hsync passes these requests +on to the distributed FS, it SHOULD declare that it supports them. ### Metadata updates From 0271da2190dc84ba56af959537276985b822faa4 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 26 Jun 2020 18:54:41 +0100 Subject: [PATCH 5/8] HADOOP-13327 add RawLocalFileystem Syncable Doesn't actually get through as there's a BufferedOutputStream in the way... will need to do something there :) Change-Id: Ib2e4517e266168f3ad106e55ceac987ed443aeec --- .../apache/hadoop/fs/RawLocalFileSystem.java | 20 ++++++++++++++++- .../site/markdown/filesystem/outputstream.md | 7 ++---- .../contract/AbstractContractCreateTest.java | 22 ++++++++++++++----- .../contract/AbstractFSContractTestBase.java | 11 +--------- .../hadoop/fs/contract/ContractTestUtils.java | 9 +++++--- .../src/test/resources/contract/localfs.xml | 10 +++++++++ .../src/test/resources/contract/rawlocal.xml | 10 +++++++++ 7 files changed, 64 insertions(+), 25 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index ba29f74cc5ca4..cb8f38a3ba0f5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -46,6 +46,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; @@ -233,7 +234,8 @@ public FSDataInputStream open(PathHandle fd, int bufferSize) /********************************************************* * For create()'s FSOutputStream. *********************************************************/ - class LocalFSFileOutputStream extends OutputStream { + class LocalFSFileOutputStream extends OutputStream + implements Syncable, StreamCapabilities { private FileOutputStream fos; private LocalFSFileOutputStream(Path f, boolean append, @@ -288,6 +290,22 @@ public void write(int b) throws IOException { throw new FSError(e); // assume native fs error } } + + @Override + public void hflush() throws IOException { + flush(); + } + + @Override + public void hsync() throws IOException { + flush(); + fos.getFD().sync(); + } + + @Override + public boolean hasCapability(String capability) { + return StoreImplementationUtils.supportsSyncable(capability); + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 4cc5c3c19f898..59fbf13b1fe48 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -782,11 +782,8 @@ local data as can be written to full checksummed blocks of data. That is, the flush operations are not guaranteed to write all the pending data until the file is finally closed. -That is, `sync()`, `hsync()` and `hflush()` may not persist all data written -to the stream. - -For anyone thinking "this is a violation of this specification" —they are correct. -The local filesystem was intended for testing, rather than production use. +For this reason, the local fileystem accessed via `file://` URLs +does not support syncable. ### Checksummed output streams diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java index 95814b8a15540..4f041eef9cfc3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -31,8 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; @@ -454,15 +457,22 @@ public void testHSync() throws Throwable { try (FSDataOutputStream out = fs.create(path, true)) { - boolean doesHFlush = out.hasCapability( - StreamCapabilities.StreamCapability.HFLUSH.getValue()); + boolean doesHFlush = out.hasCapability(StreamCapabilities.HFLUSH); if (doesHFlush) { out.hflush(); } - assertEquals("hflush support", supportsFlush, doesHFlush); - boolean doesHSync = out.hasCapability( - StreamCapabilities.StreamCapability.HSYNC.getValue()); - assertEquals("hsync support", supportsSync, doesHSync); + String[] hflushCapabilities = { + StreamCapabilities.HFLUSH, + }; + String[] hsyncCapabilities = { + StreamCapabilities.HSYNC, + }; + assertCapabilities(out, + supportsFlush ? hflushCapabilities : null, + supportsFlush ? null : hflushCapabilities); + assertCapabilities(out, + supportsSync ? hsyncCapabilities : null, + supportsSync ? null : hsyncCapabilities); try { out.hflush(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index 3c285650e9936..ac9de6d7bfe8c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -295,7 +295,7 @@ protected void handleRelaxedException(String action, if (getContract().isSupported(SUPPORTS_STRICT_EXCEPTIONS, false)) { throw e; } - LOG.warn("The expected exception {} was not the exception class" + + LOG.warn("The expected exception {} was not the exception class" + " raised on {}: {}", action , e.getClass(), expectedException, e); } @@ -398,13 +398,4 @@ protected String generateAndLogErrorListing(Path src, Path dst) throws } return destDirLS; } - - /** - * Get a path from the name of the current test case. - * @return a path based on the test method being executed. - * @throws IOException IO problem - */ - protected Path methodPath() throws IOException { - return path(methodName.getMethodName()); - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 39a41d01c458a..5fd765b2cbcc2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1542,14 +1542,16 @@ public static void assertCapabilities( StreamCapabilities source = (StreamCapabilities) stream; if (shouldHaveCapabilities != null) { for (String shouldHaveCapability : shouldHaveCapabilities) { - assertTrue("Should have capability: " + shouldHaveCapability, + assertTrue("Should have capability: " + shouldHaveCapability + + " in " + source, source.hasCapability(shouldHaveCapability)); } } if (shouldNotHaveCapabilities != null) { for (String shouldNotHaveCapability : shouldNotHaveCapabilities) { - assertFalse("Should not have capability: " + shouldNotHaveCapability, + assertFalse("Should not have capability: " + shouldNotHaveCapability + + " in " + source, source.hasCapability(shouldNotHaveCapability)); } } @@ -1569,7 +1571,8 @@ public static void assertHasPathCapabilities( for (String shouldHaveCapability: capabilities) { assertTrue("Should have capability: " + shouldHaveCapability - + " under " + path, + + " under " + path + + " in " + source, source.hasPathCapability(path, shouldHaveCapability)); } } diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml index b261a63be7df7..03bb3e800fba8 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml @@ -121,4 +121,14 @@ case sensitivity and permission options are determined at run time from OS type true + + fs.contract.supports-settimes + true + + + + fs.contract.supports-getfilestatus + true + + diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml index 8cbd4a0abcf38..f57088fa70463 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml @@ -127,4 +127,14 @@ true + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + From 16a8b25c74a8f015c3998e11b564a54cc9fa5cf8 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 21 Sep 2020 17:39:39 +0100 Subject: [PATCH 6/8] HADOOP-13327 Josh's comments Change-Id: I95e6b63eda051afae22a9d48c35b5e69e0464852 --- .../src/site/markdown/filesystem/filesystem.md | 2 +- .../src/site/markdown/filesystem/outputstream.md | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index da6f890310831..43c58da5c19bd 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -654,7 +654,7 @@ The behavior of the returned stream is covered in [Output](outputstream.html). * S3A, Swift and potentially other Object Stores do not currently change the `FS` state until the output stream `close()` operation is completed. This is a significant difference between the behavior of object stores -and that of filesystems, as it allows >1 client to create a file with `overwrite==false`, +and that of filesystems, as it allows >1 client to create a file with `overwrite=false`, and potentially confuse file/directory logic. In particular, using `create()` to acquire an exclusive lock on a file (whoever creates the file without an error is considered the holder of the lock) may not not a safe algorithm to use when working with object stores. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 59fbf13b1fe48..3d87c153bab15 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -884,7 +884,7 @@ This has been difficult to deal with within the Hadoop S3A code itself (HADOOP-16490, HADOOP-16635) -and if applications make their own probes for files before creating them, the problem will intermittently surface. -1. If you look for an object on S3 and it is not there - The 404 MAY Be returned even +1. If you look for an object on S3 and it is not there - The 404 MAY be returned even after the object has been created. 1. FS operations triggering such a probe include: `getFileStatus()`, `exists()`, `open()` and others. @@ -894,7 +894,7 @@ create files with this option except when some form of exclusivity is needed on creation -in which case, be aware, that with the non-atomic probe+create sequence which some object store connectors implement, the semantics of the creation are not sufficient to allow the filesystem to be used as an implicit coordination mechanism between processes. -`` + ## Implementors notes. ### `StreamCapabilities` @@ -914,9 +914,12 @@ on to the distributed FS, it SHOULD declare that it supports them. ### Metadata updates Implementors MAY NOT update a file's metadata (length, date, ...) after -every `hsync()` call. HDFS doesn't -so there is no need to feel too guilty. +every `hsync()` call. HDFS doesn't, except when the written data crosses +a block boundary. ### Does `close()` sync data? -By default, HDFS does not sync data to disk when a stream is closed. +By default, HDFS does not sync data to disk when a stream is closed; it will +be asynchronously saved to disk. + This does not mean that users do not expect it. From 2bf9ab47ad11026c84c8096e558c54f46136c279 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 19 Oct 2020 15:01:57 +0100 Subject: [PATCH 7/8] HADOOP-13327. Stream capabilities * use (renamed) predicate isProbeForSyncable() everywhere appropriate * HFLUSH marked as deprecated * outputstream.md lists standard capabilities, declares HFLUSH as deprecated. * clean up tests probing for the capabilities Change-Id: I2a8c2ddc7119c31a73ee327a181bf700a2c5f21f --- .../apache/hadoop/fs/RawLocalFileSystem.java | 2 +- .../apache/hadoop/fs/StreamCapabilities.java | 4 ++ .../fs/impl/StoreImplementationUtils.java | 5 +- .../site/markdown/filesystem/outputstream.md | 61 +++++++++++++------ .../hadoop/fs/contract/ContractTestUtils.java | 28 +++++++++ .../apache/hadoop/hdfs/DFSOutputStream.java | 9 +-- .../hadoop/fs/adl/AdlFsOutputStream.java | 2 +- .../fs/azure/BlockBlobAppendStream.java | 9 +-- .../azurebfs/services/AbfsOutputStream.java | 11 +--- .../fs/azure/ITestOutputStreamSemantics.java | 42 +++++++------ .../ITestAzureBlobFileSystemFlush.java | 25 +++++--- 11 files changed, 126 insertions(+), 72 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index cb8f38a3ba0f5..7f38c8cf81d7d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -304,7 +304,7 @@ public void hsync() throws IOException { @Override public boolean hasCapability(String capability) { - return StoreImplementationUtils.supportsSyncable(capability); + return StoreImplementationUtils.isProbeForSyncable(capability); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index e68e7b351ed78..4561852fd36cb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -34,7 +34,11 @@ public interface StreamCapabilities { /** * Stream hflush capability implemented by {@link Syncable#hflush()}. + *

+ * Use the {@link #HSYNC} probe to check for the support of Syncable; + * it's that presence of {@code hsync()} which matters. */ + @Deprecated String HFLUSH = "hflush"; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java index da47ad326b84c..978e57216cd2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/StoreImplementationUtils.java @@ -39,12 +39,13 @@ private StoreImplementationUtils() { } /** - * Check the supplied capabilities for being those required for full + * Check the probe capability being for {@link StreamCapabilities#HSYNC} + * or {@link StreamCapabilities#HFLUSH} * {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality. * @param capability capability string. * @return true if either refers to one of the Syncable operations. */ - public static boolean supportsSyncable(String capability) { + public static boolean isProbeForSyncable(String capability) { return capability.equalsIgnoreCase(HSYNC) || capability.equalsIgnoreCase(HFLUSH); } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 3d87c153bab15..90e2d1a2adf3e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -522,14 +522,14 @@ FS' = FS where data(path) == buffer ``` The reference implementation, `DFSOutputStream` will block -until an acknowledgement is received from the datanodes: That is, all hosts +until an acknowledgement is received from the datanodes: that is, all hosts in the replica write chain have successfully written the file. That means that the expectation callers may have is that the return of the method call contains visibility and durability guarantees which other implementations must maintain. -Note, however, that the reference `DFSOutputStream.hsync()` call only actually syncs/ +Note, however, that the reference `DFSOutputStream.hsync()` call only actually syncs *the current block*. If there have been a series of writes since the last sync, such that a block boundary has been crossed. The `hsync()` call claims only to write the most recent. @@ -553,34 +553,57 @@ another software abstraction: there are few guarantees. @InterfaceStability.Evolving ``` -The `StreamCapabilities` interface exists to allow callers to dynamically +The `org.apache.hadoop.fs.StreamCapabilities` interface exists to allow callers to dynamically determine the behavior of a stream. -The reference implementation of this interface is - `org.apache.hadoop.hdfs.DFSOutputStream` - ```java public boolean hasCapability(String capability) { - switch (StringUtils.toLowerCase(capability)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return true; - default: - return false; + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; } } ``` -Where `HSYNC` and `HFLUSH` are items in the enumeration -`org.apache.hadoop.fs.StreamCapabilities.StreamCapability`. - -Once a stream has been closed, th `hasCapability()` call MUST do one of +Once a stream has been closed, a `hasCapability()` call MUST do one of * return the capabilities of the open stream. * return false. That is: it MUST NOT raise an exception about the file being closed; +See [pathcapabilities](pathcapabilities.html) for specifics on the `PathCapabilities` API; +the requirements are similar: a stream MUST NOT return true for a capability +for which it lacks support, be it because + +* The capability is unknown. +* The capability is known and known to be unsupported. + +Standard stream capabilities are defined in `StreamCapabilities`; +consult the javadocs for the complete set of options. + +| Name | Probes for support of | +|-------|---------| +| `hsync` | `Syncable.hsync()` | +| `hflush` | `Syncable.hflush()`. Deprecated: probe for `HSYNC` only. | +| `in:readahead` | `CanSetReadahead.setReadahead()` | +| `dropbehind` | `CanSetDropBehind.setDropBehind()` | +| `in:unbuffer"` | `CanUnbuffer.unbuffer()` | +| `in:readbytebuffer` | `ByteBufferReadable#read(ByteBuffer)` | +| `in:preadbytebuffer` | `yteBufferPositionedReadable#read(long, ByteBuffer)` | + +Stream implementations MAY add their own custom options. +These MUST be prefixed with `fs.SCHEMA.`, where `SCHEMA` is the schema of the filesystem. + +In particular, S3A output streams add the following capability + +| Name | Probes for support of | +|-------|---------| +| `fs.s3a.capability.magic.output.stream` | Is the output stream a delayed-visibility stream? | + ## interface `CanSetDropBehind` ```java @@ -603,8 +626,12 @@ public interface CanSetDropBehind { ``` This interface allows callers to change policies used inside HDFS. -They are currently unimplemented by any stream other than those in HDFS. +Implementations MUST return `true` for the call + +```java +StreamCapabilities.hasCapability("dropbehind"); +``` ## Durability, Concurrency, Consistency and Visibility of stream output. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 5fd765b2cbcc2..c8cf19758f1dd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1557,6 +1557,34 @@ public static void assertCapabilities( } } + + /** + * Custom assert to verify capabilities supported by + * an object through {@link StreamCapabilities}. + * + * @param source The object to test for StreamCapabilities + * @param capabilities The list of expected capabilities + */ + public static void assertHasStreamCapabilities( + final Object source, + final String... capabilities) { + assertCapabilities(source, capabilities, null); + } + + /** + * Custom assert to verify capabilities NOT supported by + * an object through {@link StreamCapabilities}. + * + * @param source The object to test for StreamCapabilities + * @param capabilities The list of capabilities which must not be + * supported. + */ + public static void assertLacksStreamCapabilities( + final Object source, + final String... capabilities) { + assertCapabilities(source, null, capabilities); + } + /** * Custom assert to test {@link PathCapabilities}. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f820e5f42cc67..33dff34cd9651 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -563,13 +564,7 @@ void endBlock() throws IOException { @Override public boolean hasCapability(String capability) { - switch (StringUtils.toLowerCase(capability)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return true; - default: - return false; - } + return StoreImplementationUtils.isProbeForSyncable(capability); } /** diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java index f8bb8e33b1403..dd4495319d670 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java @@ -85,6 +85,6 @@ public synchronized void hsync() throws IOException { @Override public boolean hasCapability(String capability) { - return StoreImplementationUtils.supportsSyncable(capability); + return StoreImplementationUtils.isProbeForSyncable(capability); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index 8fe080dbce750..edca37e4804bc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -551,13 +552,7 @@ public boolean hasCapability(String capability) { if (!compactionEnabled) { return false; } - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return true; - default: - return false; - } + return StoreImplementationUtils.isProbeForSyncable(capability); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index b53b2b2eed954..22d4e30f95cdd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -32,7 +32,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - +This import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; +import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; import static org.apache.hadoop.io.IOUtils.wrapException; /** @@ -144,13 +145,7 @@ public AbfsOutputStream( */ @Override public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return supportFlush; - default: - return false; - } + return supportFlush && isProbeForSyncable(capability); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java index b8edc4b7d6586..835b82c3c1924 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutputStreamSemantics.java @@ -27,8 +27,6 @@ import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlockBlob; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,8 +37,9 @@ import org.hamcrest.core.IsNot; import org.junit.Test; -import static org.junit.Assert.*; -import static org.junit.Assume.assumeNotNull; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities; /** * Test semantics of functions flush, hflush, hsync, and close for block blobs, @@ -192,11 +191,14 @@ public void testPageBlobClose() throws IOException { public void testPageBlobCapabilities() throws IOException { Path path = getBlobPathWithTestName(PAGE_BLOB_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertCapabilities(stream, + new String[]{ + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER}, + null); stream.write(getRandomBytes()); } } @@ -285,11 +287,12 @@ public void testBlockBlobClose() throws IOException { public void testBlockBlobCapabilities() throws IOException { Path path = getBlobPathWithTestName(BLOCK_BLOB_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertLacksStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); stream.write(getRandomBytes()); } } @@ -381,11 +384,12 @@ public void testBlockBlobCompactionClose() throws IOException { public void testBlockBlobCompactionCapabilities() throws IOException { Path path = getBlobPathWithTestName(BLOCK_BLOB_COMPACTION_DIR); try (FSDataOutputStream stream = fs.create(path)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertHasStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); stream.write(getRandomBytes()); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 92aa5520ee4fd..d8f0dc28dd5ee 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -41,6 +41,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasStreamCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksStreamCapabilities; + /** * Test flush operation. * This class cannot be run in parallel test mode--check comments in @@ -306,11 +309,12 @@ public void testStreamCapabilitiesWithFlushDisabled() throws Exception { final Path testFilePath = path(methodName.getMethodName()); try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) { - assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertFalse(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertLacksStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); } } @@ -320,11 +324,12 @@ public void testStreamCapabilitiesWithFlushEnabled() throws Exception { byte[] buffer = getRandomBytesArray(); final Path testFilePath = path(methodName.getMethodName()); try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) { - assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH)); - assertTrue(stream.hasCapability(StreamCapabilities.HSYNC)); - assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND)); - assertFalse(stream.hasCapability(StreamCapabilities.READAHEAD)); - assertFalse(stream.hasCapability(StreamCapabilities.UNBUFFER)); + assertHasStreamCapabilities(stream, + StreamCapabilities.HFLUSH, + StreamCapabilities.HSYNC, + StreamCapabilities.DROPBEHIND, + StreamCapabilities.READAHEAD, + StreamCapabilities.UNBUFFER); } } From e7a12123d3064125a53fcbb75870656dd4f65a4b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 20 Nov 2020 11:38:19 +0000 Subject: [PATCH 8/8] HADOOP-13327 review feedback and a bit of rereading myself Change-Id: I00df6431167e04f2210d76d5712d8c16e70f1c70 --- .../java/org/apache/hadoop/fs/Syncable.java | 4 +- .../site/markdown/filesystem/filesystem.md | 4 +- .../site/markdown/filesystem/outputstream.md | 58 +++++++++++++------ .../azurebfs/services/AbfsOutputStream.java | 3 +- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java index e23e8d8aabca1..9cd458592ca22 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java @@ -24,9 +24,9 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * This is the interface for flush/sync operation. + * This is the interface for flush/sync operations. * Consult the Hadoop filesystem specification for the definition of the - * semantics of this operation. + * semantics of these operations. */ @InterfaceAudience.Public @InterfaceStability.Stable diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 43c58da5c19bd..3f9dbcfdc6928 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -634,9 +634,9 @@ For instance, HDFS may raise an `InvalidPathException`. result = FSDataOutputStream -A zero byte file must exist at the end of the specified path, visible to all. +A zero byte file MUST exist at the end of the specified path, visible to all. -The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`. +The updated (valid) FileSystem MUST contain all the parent directories of the path, as created by `mkdirs(parent(p))`. The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of `FS.Files[p]` diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md index 90e2d1a2adf3e..7921b4e77362e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md @@ -63,7 +63,9 @@ through a chain of streams. ## Output Stream Model For this specification, an output stream can be viewed as a list of bytes -stored in in the client +stored in the client -the `hsync()` and `hflush()` operations the actions +which propagate the data to be visible to other readers of the file and/or +made durable. ```python buffer: List[byte] @@ -196,14 +198,14 @@ As the HDFS implementation is considered the de-facto specification of the FileSystem APIs, the fact that `write()` is thread-safe is significant. For compatibility, not only SHOULD other FS clients be thread-safe, -but new HDFS featues, such as encryption and Erasure Coding SHOULD also +but new HDFS features, such as encryption and Erasure Coding SHOULD also implement consistent behavior with the core HDFS output stream. Put differently: *It isn't enough for Output Streams to implement the core semantics of `java.io.OutputStream`: they need to implement the extra semantics -of `HdfsDataOutputStream`, especially for HBase to work correctly. +of `HdfsDataOutputStream`, especially for HBase to work correctly.* The concurrent `write()` call is the most significant tightening of the Java specification. @@ -268,10 +270,6 @@ offset < data.length else raise IndexOutOfBoundsException offset + len < data.length else raise IndexOutOfBoundsException ``` -There may be an explicit limit on the size of cached data, or an implicit -limit based by the available capacity of the destination filesystem. -When a limit is reached, `write()` SHOULD fail with an `IOException`. - After the operation has returned, the buffer may be re-used. The outcome of updates to the buffer while the `write()` operation is in progress is undefined. @@ -329,9 +327,10 @@ and should be invoked after writing every single line of output, after writing small 4KB blocks or similar. Forwarding this to a full flush across a distributed filesystem, or worse, -a distant object store, is very underperformant - -See [HADOOP-16548](https://issues.apache.org/jira/browse/HADOOP-16548) +a distant object store, is very inefficient. +Filesystem clients which do uprate a `flush()` to an `hflush()` will eventually +have to roll back that feature: +[HADOOP-16548](https://issues.apache.org/jira/browse/HADOOP-16548). ### `close()` @@ -350,8 +349,7 @@ updated). After `close()` is invoked, all subsequent `write()` calls on the stream MUST fail with an `IOException`. - -Any locking/leaseholding mechanism is also required to release its lock/lease. +Any locking/leaseholding mechanism MUST release its lock/lease. ```python Stream'.open = false @@ -360,7 +358,7 @@ FS' = FS where data(FS', path) == buffer The `close()` call MAY fail during its operation. -1. Callers of the API MUST expect for some calls to fail and SHOULD code appropriately. +1. Callers of the API MUST expect for some calls to `close()` to fail and SHOULD code appropriately. Catching and swallowing exceptions, while common, is not always the ideal solution. 1. Even after a failure, `close()` MUST place the stream into a closed state. Follow-on calls to `close()` are ignored, and calls to other methods @@ -382,8 +380,19 @@ may hide serious problems. delay in `close()` does not block the thread so long that the heartbeat times out. -And for implementors: have a look at [HADOOP-16785](https://issues.apache.org/jira/browse/HADOOP-16785) -to see examples of complications here. +Implementors: + +* Have a look at [HADOOP-16785](https://issues.apache.org/jira/browse/HADOOP-16785) +to see examples of complications in close. +* Incrementally writing blocks before a close operation results in a behavior which +matches client expectations better: write failures to surface earlier and close +to be more housekeeping than the actual upload. +* If block uploads are executed in separate threads, the output stream `close()` +call MUST block until all the asynchronous uploads have completed; any error raised +MUST be reported. +If multiple errors were raised, the stream can choose which to propagate. +What is important is: when `close()` returns without an error, applications expect +the data to have been successfully written. ### HDFS and `OutputStream.close()` @@ -651,7 +660,7 @@ filesystem. After returning to the caller, the data MUST be visible to other rea it MAY be durable. That is: it does not have to be persisted, merely guaranteed to be consistently visible to all clients attempting to open a new stream reading data at the path. -1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable +1. `Syncable.hsync()` MUST transmit the data as per `hflush` the data and persist data to the underlying durable storage. 1. `close()` The first call to `close()` MUST flush out all remaining data in the buffers, and persist it. @@ -664,6 +673,21 @@ Thus: `flush()` is often treated at most as a cue to flush data to the network buffers -but not commit to writing any data. It is only the `Syncable` interface which offers guarantees. +The two `Syncable` operations `hsync()` and `hflush()` differ purely by the extra guarantee of `hsync()`: the data must be persisted. +If `hsync()` is implemented, then `hflush()` can be implemented simply +by invoking `hsync()` + +```java +public void hflush() throws IOException { + hsync(); +} +``` + +This is perfectly acceptable as an implementation: the semantics of `hflush()` +are satisifed. +What is not acceptable is downgrading `hsync()` to `hflush()`, as the durability guarantee is no longer met. + + ### Concurrency 1. The outcome of more than one process writing to the same file is undefined. @@ -858,7 +882,7 @@ be visible: `data(FS', path) = data(FS, path)`. 1. The check for existing data in a `create()` call with `overwrite=False`, may take place in the `create()` call itself, in the `close()` call prior to/during -the write, or at some point in between. Expect in the special case that the +the write, or at some point in between. In the special case that the object store supports an atomic `PUT` operation, the check for existence of existing data and the subsequent creation of data at the path contains a race condition: other clients may create data at the path between the existence check diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 22d4e30f95cdd..33568b960e330 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -24,7 +24,6 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.nio.ByteBuffer; -import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorCompletionService; @@ -32,7 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -This + import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger;