Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -425,6 +426,7 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
return data;
}


/**
* Vectored read.
* If the file has no checksums: delegate to the underlying stream.
Expand All @@ -438,6 +440,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, (b) -> { });
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
Expand All @@ -462,8 +471,8 @@ public void readVectored(List<? extends FileRange> ranges,
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool;

/**
* Stream that permits positional reading.
Expand Down Expand Up @@ -133,4 +135,30 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}

/**
* Variant of {@link #readVectored(List, IntFunction)} where a release() function
* may be invoked if problems surface during reads -this method is called to
* try to return any allocated buffer which has not been read yet.
* Buffers which have successfully been read and returned to the caller do not
* get released: this is for failures only.
* <p>
* The default implementation calls readVectored/2 so as to ensure that
* if an existing stream implementation does not implement this method
* all is good.
* <p>
* Implementations SHOULD override this method if they can release buffers as
* part of their error handling.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @param release the function to release a ByteBuffer.
* @throws IOException any IOE.
* @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws IOException {
readVectored(ranges, allocate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -68,6 +69,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -319,6 +321,13 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.slf4j.Logger;
Expand All @@ -52,6 +53,15 @@
private static final Logger LOG =
LoggerFactory.getLogger(VectoredReadUtils.class);

/**
* This releaser just logs at debug that the buffer
* was released.
*/
public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED =
(buffer) -> {
LOG.debug("release buffer {}", buffer.toString());
};

/**
* Validate a single range.
* @param range range to validate.
Expand Down Expand Up @@ -98,11 +108,29 @@
public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws EOFException {
readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* Variant of {@link #readVectored(PositionedReadable, List, IntFunction)}
* where a release() function is invoked if problems surface during reads.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @param release the function to release a ByteBuffer.
* @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
* @throws EOFException the range offset is negative
*/
public static void readVectored(PositionedReadable stream,

Check failure on line 123 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java#L123

javadoc: warning: no @param for stream
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws EOFException {

for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) {
range.setData(readRangeFrom(stream, range, allocate));
range.setData(readRangeFrom(stream, range, allocate, release));
}
}


/**
* Synchronously reads a range from the stream dealing with the combinations
* of ByteBuffers buffers and PositionedReadable streams.
Expand All @@ -118,11 +146,31 @@
PositionedReadable stream,
FileRange range,
IntFunction<ByteBuffer> allocate) throws EOFException {
return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* Synchronously reads a range from the stream dealing with the combinations
* of ByteBuffers buffers and PositionedReadable streams.
* @param stream the stream to read from
* @param range the range to read
* @param allocate the function to allocate ByteBuffers
* @param release the function to release a ByteBuffer.
* @return the CompletableFuture that contains the read data or an exception.
* @throws IllegalArgumentException the range is invalid other than by offset or being null.
* @throws EOFException the range offset is negative
* @throws NullPointerException if the range is null.
*/
public static CompletableFuture<ByteBuffer> readRangeFrom(
PositionedReadable stream,
FileRange range,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws EOFException {

validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ByteBuffer buffer = allocate.apply(range.getLength());
try {
ByteBuffer buffer = allocate.apply(range.getLength());
if (stream instanceof ByteBufferPositionedReadable) {
LOG.debug("ByteBufferPositionedReadable.readFully of {}", range);
((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
Expand All @@ -136,6 +184,7 @@
result.complete(buffer);
} catch (IOException ioe) {
LOG.debug("Failed to read {}", range, ioe);
release.accept(buffer);
result.completeExceptionally(ioe);
}
return result;
Expand All @@ -147,6 +196,8 @@
* @param range file range
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
private static void readNonByteBufferPositionedReadable(
PositionedReadable stream,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.io.ByteBufferPool;

import static java.util.Objects.requireNonNull;

/**
* A ByteBufferPool implementation that uses a pair of functions to allocate
* and release ByteBuffers; intended for use implementing the VectorIO API
* as it makes the pair of functions easier to pass around and use in
* existing code.
* <p>
* No matter what kind of buffer is requested, the allocation function
* is invoked; that is: the direct flag is ignored.
*/
public final class VectorIOBufferPool implements ByteBufferPool {

/** The function to allocate a buffer. */
private final IntFunction<ByteBuffer> allocate;

/** The function to release a buffer. */
private final Consumer<ByteBuffer> release;

/**
* @param allocate the function to allocate ByteBuffer
* @param release the function to release a ByteBuffer.
*/
public VectorIOBufferPool(
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) {
this.allocate = requireNonNull(allocate);
this.release = requireNonNull(release);
}

/**
* Get a ByteBuffer.
* @param direct heap/direct flag. Unused.
* @param length The minimum length the buffer will have.
* @return a buffer
*/
@Override
public ByteBuffer getBuffer(final boolean direct, final int length) {
return allocate.apply(length);
}

/**
* Release a buffer.
* Unlike normal ByteBufferPool implementations
* a null buffer is accepted and ignored.
* @param buffer buffer to release; may be null.
*/
@Override
public void putBuffer(final ByteBuffer buffer) {
if (buffer != null) {
release.accept(buffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.assertj.core.api.Assertions;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.test.HadoopTestBase;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -823,4 +825,49 @@ public void testReadOverEOFRejected() throws Exception {
asList(createFileRange(length - 1, 2)),
Optional.of(length)));
}

@Test
public void testVectorIOBufferPool() throws Throwable {
ElasticByteBufferPool elasticByteBufferPool = new ElasticByteBufferPool();

// inlined lambda to assert the pool size
Consumer<Integer> assertPoolSize = (size) -> {
Assertions.assertThat(elasticByteBufferPool.size(false))
.describedAs("Pool size")
.isEqualTo(size);
};

// build vector pool from the buffer pool operations converted to
// allocate and release lambda expressions
VectorIOBufferPool vectorBuffers = new VectorIOBufferPool(
r -> elasticByteBufferPool.getBuffer(false, r),
elasticByteBufferPool::putBuffer);

assertPoolSize.accept(0);

final ByteBuffer b1 = vectorBuffers.getBuffer(false, 100);
final ByteBuffer b2 = vectorBuffers.getBuffer(false, 50);

// return the first buffer for a pool size of 1
vectorBuffers.putBuffer(b1);
assertPoolSize.accept(1);

// expect the returned buffer back
ByteBuffer b3 = vectorBuffers.getBuffer(true, 100);
Assertions.assertThat(b3)
.describedAs("buffer returned from a get after a previous one was returned")
.isSameAs(b1);
assertPoolSize.accept(0);

// return them all
vectorBuffers.putBuffer(b2);
vectorBuffers.putBuffer(b3);
assertPoolSize.accept(2);

// release does not propagate
vectorBuffers.release();
assertPoolSize.accept(2);

elasticByteBufferPool.release();
}
}
Loading
Loading