Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,8 @@ public final byte[] serialize(Object o) throws Exception {
return ((String) o).getBytes(UTF_8);
} else {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GZIPOutputStream out = new GZIPOutputStream(bytes);
try {
try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
mapper.writeValue(out, o);
} finally {
out.close();
}
return bytes.toByteArray();
}
Expand All @@ -69,11 +66,8 @@ public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
if (klass.equals(String.class)) {
return (T) new String(data, UTF_8);
} else {
GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
try {
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data))) {
return mapper.readValue(in, klass);
} finally {
in.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testSkip() throws Exception {
public void testNegativeIndexValues() throws Exception {
List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);

expected.stream().forEach(i -> {
expected.forEach(i -> {
try {
db.write(createCustomType1(i));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,37 +143,39 @@ public void releaseBuffers() {
}

private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception {
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
final Semaphore sem = new Semaphore(0);

final FetchResult res = new FetchResult();
res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());

ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
buffer.retain();
res.successChunks.add(chunkIndex);
res.buffers.add(buffer);
sem.release();
}
try (TransportClient client =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) {
final Semaphore sem = new Semaphore(0);

res.successChunks = Collections.synchronizedSet(new HashSet<Integer>());
res.failedChunks = Collections.synchronizedSet(new HashSet<Integer>());
res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());

ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
buffer.retain();
res.successChunks.add(chunkIndex);
res.buffers.add(buffer);
sem.release();
}

@Override
public void onFailure(int chunkIndex, Throwable e) {
res.failedChunks.add(chunkIndex);
sem.release();
}
};
@Override
public void onFailure(int chunkIndex, Throwable e) {
res.failedChunks.add(chunkIndex);
sem.release();
}
};

for (int chunkIndex : chunkIndices) {
client.fetchChunk(STREAM_ID, chunkIndex, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
for (int chunkIndex : chunkIndices) {
client.fetchChunk(STREAM_ID, chunkIndex, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
}
}
client.close();
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) {
dis.readFully(buffer.array());
} finally {
if (dis != null) {
dis.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException {
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo(SORT_MANAGER));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream();
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream();
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);
try (InputStream block0Stream = resolver.getBlockData(
"app0", "exec0", 0, 0, 0).createInputStream()) {
String block0 =
CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
assertEquals(sortBlock0, block0);
}

try (InputStream block1Stream = resolver.getBlockData(
"app0", "exec0", 0, 0, 1).createInputStream()) {
String block1 =
CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
assertEquals(sortBlock1, block1);
}
}

@Test
Expand Down Expand Up @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() {

private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) {
String normPathname =
ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3);
assertEquals(expectedPathname, normPathname);
File file = new File(normPathname);
String returnedPath = file.getPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,37 +133,37 @@ private FetchResult fetchBlocks(

final Semaphore requestsRemaining = new Semaphore(0);

ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
synchronized (this) {
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
data.retain();
res.successBlocks.add(blockId);
res.buffers.add(data);
requestsRemaining.release();
try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) {
client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
synchronized (this) {
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
data.retain();
res.successBlocks.add(blockId);
res.buffers.add(data);
requestsRemaining.release();
}
}
}
}

@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
synchronized (this) {
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
res.failedBlocks.add(blockId);
requestsRemaining.release();

@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
synchronized (this) {
if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
res.failedBlocks.add(blockId);
requestsRemaining.release();
}
}
}
}
}, null);
}, null);

if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
}
}
client.close();
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ private void validate(String appId, String secretKey, boolean encrypt)
ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
}

ExternalShuffleClient client =
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
client.init(appId);
// Registration either succeeds or throws an exception.
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
new ExecutorShuffleInfo(new String[0], 0,
"org.apache.spark.shuffle.sort.SortShuffleManager"));
client.close();
try (ExternalShuffleClient client =
new ExternalShuffleClient(
testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
client.init(appId);
// Registration either succeeds or throws an exception.
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
new ExecutorShuffleInfo(
new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager")
);
}
}

/** Provides a secret key holder which always returns the given secret key, for a single appId. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ public static CountMinSketch readFrom(InputStream in) throws IOException {
* Reads in a {@link CountMinSketch} from a byte array.
*/
public static CountMinSketch readFrom(byte[] bytes) throws IOException {
InputStream in = new ByteArrayInputStream(bytes);
CountMinSketch cms = readFrom(in);
in.close();
return cms;
try (InputStream in = new ByteArrayInputStream(bytes)) {
return readFrom(in);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ public void writeTo(OutputStream out) throws IOException {

@Override
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
writeTo(out);
out.close();
return out.toByteArray();
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
writeTo(out);
return out.toByteArray();
}
}

public static CountMinSketchImpl readFrom(InputStream in) throws IOException {
Expand Down
102 changes: 49 additions & 53 deletions core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,62 +135,58 @@ private void readAsync() throws IOException {
} finally {
stateChangeLock.unlock();
}
executorService.execute(new Runnable() {

@Override
public void run() {
stateChangeLock.lock();
try {
if (isClosed) {
readInProgress = false;
return;
}
// Flip this so that the close method will not close the underlying input stream when we
// are reading.
isReading = true;
} finally {
stateChangeLock.unlock();
executorService.execute(() -> {
stateChangeLock.lock();
try {
if (isClosed) {
readInProgress = false;
return;
}
// Flip this so that the close method will not close the underlying input stream when we
// are reading.
isReading = true;
} finally {
stateChangeLock.unlock();
}

// Please note that it is safe to release the lock and read into the read ahead buffer
// because either of following two conditions will hold - 1. The active buffer has
// data available to read so the reader will not read from the read ahead buffer.
// 2. This is the first time read is called or the active buffer is exhausted,
// in that case the reader waits for this async read to complete.
// So there is no race condition in both the situations.
int read = 0;
int off = 0, len = arr.length;
Throwable exception = null;
try {
// try to fill the read ahead buffer.
// if a reader is waiting, possibly return early.
do {
read = underlyingInputStream.read(arr, off, len);
if (read <= 0) break;
off += read;
len -= read;
} while (len > 0 && !isWaiting.get());
} catch (Throwable ex) {
exception = ex;
if (ex instanceof Error) {
// `readException` may not be reported to the user. Rethrow Error to make sure at least
// The user can see Error in UncaughtExceptionHandler.
throw (Error) ex;
}
} finally {
stateChangeLock.lock();
readAheadBuffer.limit(off);
if (read < 0 || (exception instanceof EOFException)) {
endOfStream = true;
} else if (exception != null) {
readAborted = true;
readException = exception;
}
readInProgress = false;
signalAsyncReadComplete();
stateChangeLock.unlock();
closeUnderlyingInputStreamIfNecessary();
// Please note that it is safe to release the lock and read into the read ahead buffer
// because either of following two conditions will hold - 1. The active buffer has
// data available to read so the reader will not read from the read ahead buffer.
// 2. This is the first time read is called or the active buffer is exhausted,
// in that case the reader waits for this async read to complete.
// So there is no race condition in both the situations.
int read = 0;
int off = 0, len = arr.length;
Throwable exception = null;
try {
// try to fill the read ahead buffer.
// if a reader is waiting, possibly return early.
do {
read = underlyingInputStream.read(arr, off, len);
if (read <= 0) break;
off += read;
len -= read;
} while (len > 0 && !isWaiting.get());
} catch (Throwable ex) {
exception = ex;
if (ex instanceof Error) {
// `readException` may not be reported to the user. Rethrow Error to make sure at least
// The user can see Error in UncaughtExceptionHandler.
throw (Error) ex;
}
} finally {
stateChangeLock.lock();
readAheadBuffer.limit(off);
if (read < 0 || (exception instanceof EOFException)) {
endOfStream = true;
} else if (exception != null) {
readAborted = true;
readException = exception;
}
readInProgress = false;
signalAsyncReadComplete();
stateChangeLock.unlock();
closeUnderlyingInputStreamIfNecessary();
}
});
}
Expand Down
Loading