Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
Expand Down Expand Up @@ -99,14 +100,18 @@ public void testSeekZeroByteFile() throws Throwable {
describe("seek and read a 0 byte file");
instream = getFileSystem().open(zeroByteFile);
assertEquals(0, instream.getPos());
assertNotAvailable(instream);
//expect initial read to fai;
int result = instream.read();
assertMinusOne("initial byte read", result);
assertNotAvailable(instream);
byte[] buffer = new byte[1];
//expect that seek to 0 works
instream.seek(0);
assertNotAvailable(instream);
//reread, expect same exception
result = instream.read();
assertNotAvailable(instream);
assertMinusOne("post-seek byte read", result);
result = instream.read(buffer, 0, 1);
assertMinusOne("post-seek buffer read", result);
Expand Down Expand Up @@ -172,6 +177,11 @@ public void testSeekReadClosedFile() throws Throwable {
// sure there's no other exception like an NPE.

}
try{
instream.available();
}catch (IOException | IllegalStateException e) {
// expected a closed file
}
//and close again
instream.close();
}
Expand All @@ -194,6 +204,7 @@ public void testNegativeSeek() throws Throwable {
//bad seek -expected, but not as preferred as an EOFException
handleRelaxedException("a negative seek", "EOFException", e);
}
assertTrue("The available should be zero",instream.available() >= 0);
assertEquals(0, instream.getPos());
}

Expand All @@ -205,6 +216,7 @@ public void testSeekFile() throws Throwable {
//expect that seek to 0 works
instream.seek(0);
int result = instream.read();
assertAvailable(instream);
assertEquals(0, result);
assertEquals(1, instream.read());
assertEquals(2, instream.getPos());
Expand All @@ -226,7 +238,9 @@ public void testSeekAndReadPastEndOfFile() throws Throwable {
//go just before the end
instream.seek(TEST_FILE_LEN - 2);
assertTrue("Premature EOF", instream.read() != -1);
assertAvailable(instream);
assertTrue("Premature EOF", instream.read() != -1);
assertNotAvailable(instream);
assertMinusOne("read past end of file", instream.read());
}

Expand Down Expand Up @@ -261,6 +275,7 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
//now go back and try to read from a valid point in the file
instream.seek(1);
assertTrue("Premature EOF", instream.read() != -1);
assertAvailable(instream);
}

/**
Expand All @@ -278,6 +293,7 @@ public void testSeekBigFile() throws Throwable {
//expect that seek to 0 works
instream.seek(0);
int result = instream.read();
assertAvailable(instream);
assertEquals(0, result);
assertEquals(1, instream.read());
assertEquals(2, instream.read());
Expand All @@ -296,6 +312,7 @@ public void testSeekBigFile() throws Throwable {
instream.seek(0);
assertEquals(0, instream.getPos());
instream.read();
assertAvailable(instream);
assertEquals(1, instream.getPos());
byte[] buf = new byte[80 * 1024];
instream.readFully(1, buf, 0, buf.length);
Expand All @@ -314,14 +331,15 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
instream.seek(39999);
assertTrue(-1 != instream.read());
assertEquals(40000, instream.getPos());

assertAvailable(instream);
int v = 256;
byte[] readBuffer = new byte[v];
assertEquals(v, instream.read(128, readBuffer, 0, v));
//have gone back
assertEquals(40000, instream.getPos());
//content is the same too
assertEquals("@40000", block[40000], (byte) instream.read());
assertAvailable(instream);
//now verify the picked up data
for (int i = 0; i < 256; i++) {
assertEquals("@" + i, block[i + 128], readBuffer[i]);
Expand Down Expand Up @@ -376,6 +394,7 @@ public void testReadFullyZeroByteFile() throws Throwable {
assertEquals(0, instream.getPos());
byte[] buffer = new byte[1];
instream.readFully(0, buffer, 0, 0);
assertNotAvailable(instream);
assertEquals(0, instream.getPos());
// seek to 0 read 0 bytes from it
instream.seek(0);
Expand Down Expand Up @@ -587,6 +606,16 @@ public void testReadAtExactEOF() throws Throwable {
instream = getFileSystem().open(smallSeekFile);
instream.seek(TEST_FILE_LEN -1);
assertTrue("read at last byte", instream.read() > 0);
assertNotAvailable(instream);
assertEquals("read just past EOF", -1, instream.read());
}

private void assertAvailable(InputStream inputStream) throws IOException {
assertTrue("Data available in " + instream, inputStream.available() >0 );
}

private void assertNotAvailable(InputStream inputStream) throws IOException {
assertTrue("The steam is not available", inputStream.available() == 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;

import static java.lang.Math.min;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;

/**
Expand Down Expand Up @@ -192,7 +193,7 @@ private synchronized void reopen(String reason, long targetPos, long length,
}

@Override
public synchronized long getPos() throws IOException {
public synchronized long getPos() {
return (nextReadPos < 0) ? 0 : nextReadPos;
}

Expand Down Expand Up @@ -253,7 +254,7 @@ private void seekInStream(long targetPos, long length) throws IOException {
// then choose whichever comes first: the range or the EOF
long remainingInCurrentRequest = remainingInCurrentRequest();

long forwardSeekLimit = Math.min(remainingInCurrentRequest,
long forwardSeekLimit = min(remainingInCurrentRequest,
forwardSeekRange);
boolean skipForward = remainingInCurrentRequest > 0
&& diff <= forwardSeekLimit;
Expand Down Expand Up @@ -593,12 +594,13 @@ public synchronized boolean resetConnection() throws IOException {
@Override
public synchronized int available() throws IOException {
checkNotClosed();

int availableSize = this.wrappedStream == null ? 0 : this.wrappedStream.available();
long remaining = remainingInFile();
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
long validatedSize = min(availableSize, remaining);
return (int)validatedSize;
}

/**
Expand All @@ -607,8 +609,8 @@ public synchronized int available() throws IOException {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInFile() {
return this.contentLength - this.pos;
public synchronized long remainingInFile() throws IOException {
return this.contentLength - this.getPos();
}

/**
Expand All @@ -619,7 +621,7 @@ public synchronized long remainingInFile() {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInCurrentRequest() {
return this.contentRangeFinish - this.pos;
return this.contentRangeFinish - this.getPos();
}

@InterfaceAudience.Private
Expand Down Expand Up @@ -776,7 +778,7 @@ static long calculateRequestLimit(

}
// cannot read past the end of the object
rangeLimit = Math.min(contentLength, rangeLimit);
rangeLimit = min(contentLength, rangeLimit);
return rangeLimit;
}

Expand Down