diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java index bd8d9486acde..771a9541bb34 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java @@ -54,11 +54,8 @@ public final byte[] serialize(Object o) throws Exception { return ((String) o).getBytes(UTF_8); } else { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - GZIPOutputStream out = new GZIPOutputStream(bytes); - try { + try (GZIPOutputStream out = new GZIPOutputStream(bytes)) { mapper.writeValue(out, o); - } finally { - out.close(); } return bytes.toByteArray(); } @@ -69,11 +66,8 @@ public final T deserialize(byte[] data, Class klass) throws Exception { if (klass.equals(String.class)) { return (T) new String(data, UTF_8); } else { - GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data)); - try { + try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data))) { return mapper.readValue(in, klass); - } finally { - in.close(); } } } diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 205f7df87c5b..39a952f2b0df 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -217,7 +217,7 @@ public void testSkip() throws Exception { public void testNegativeIndexValues() throws Exception { List expected = Arrays.asList(-100, -50, 0, 50, 100); - expected.stream().forEach(i -> { + expected.forEach(i -> { try { db.write(createCustomType1(i)); } catch (Exception e) { diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index 824482af08dd..37a8664a5266 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -143,37 +143,39 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { - TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); - final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); - res.successChunks = Collections.synchronizedSet(new HashSet()); - res.failedChunks = Collections.synchronizedSet(new HashSet()); - res.buffers = Collections.synchronizedList(new LinkedList()); - ChunkReceivedCallback callback = new ChunkReceivedCallback() { - @Override - public void onSuccess(int chunkIndex, ManagedBuffer buffer) { - buffer.retain(); - res.successChunks.add(chunkIndex); - res.buffers.add(buffer); - sem.release(); - } + try (TransportClient client = + clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) { + final Semaphore sem = new Semaphore(0); + + res.successChunks = Collections.synchronizedSet(new HashSet()); + res.failedChunks = Collections.synchronizedSet(new HashSet()); + res.buffers = Collections.synchronizedList(new LinkedList()); + + ChunkReceivedCallback callback = new ChunkReceivedCallback() { + @Override + public void onSuccess(int chunkIndex, ManagedBuffer buffer) { + buffer.retain(); + res.successChunks.add(chunkIndex); + res.buffers.add(buffer); + sem.release(); + } - @Override - public void onFailure(int chunkIndex, Throwable e) { - res.failedChunks.add(chunkIndex); - sem.release(); - } - }; + @Override + public void onFailure(int chunkIndex, Throwable e) { + res.failedChunks.add(chunkIndex); + sem.release(); + } + }; - for (int chunkIndex : chunkIndices) { - client.fetchChunk(STREAM_ID, chunkIndex, callback); - } - if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); + for (int chunkIndex : chunkIndices) { + client.fetchChunk(STREAM_ID, chunkIndex, callback); + } + if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { + fail("Timeout getting response from the server"); + } } - client.close(); return res; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 386738ece51a..371149bef397 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -37,14 +37,8 @@ public ShuffleIndexInformation(File indexFile) throws IOException { size = (int)indexFile.length(); ByteBuffer buffer = ByteBuffer.allocate(size); offsets = buffer.asLongBuffer(); - DataInputStream dis = null; - try { - dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); + try (DataInputStream dis = new DataInputStream(Files.newInputStream(indexFile.toPath()))) { dis.readFully(buffer.array()); - } finally { - if (dis != null) { - dis.close(); - } } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index d2072a54fa41..459629c5f05f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); - InputStream block0Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); - String block0 = CharStreams.toString( - new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); - block0Stream.close(); - assertEquals(sortBlock0, block0); - - InputStream block1Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); - String block1 = CharStreams.toString( - new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); - block1Stream.close(); - assertEquals(sortBlock1, block1); + try (InputStream block0Stream = resolver.getBlockData( + "app0", "exec0", 0, 0, 0).createInputStream()) { + String block0 = + CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); + assertEquals(sortBlock0, block0); + } + + try (InputStream block1Stream = resolver.getBlockData( + "app0", "exec0", 0, 0, 1).createInputStream()) { + String block1 = + CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); + assertEquals(sortBlock1, block1); + } } @Test @@ -149,7 +149,7 @@ public void testNormalizeAndInternPathname() { private void assertPathsMatch(String p1, String p2, String p3, String expectedPathname) { String normPathname = - ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3); + ExternalShuffleBlockResolver.createNormalizedInternedPathname(p1, p2, p3); assertEquals(expectedPathname, normPathname); File file = new File(normPathname); String returnedPath = file.getPath(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index a6a1b8d0ac3f..526b96b36447 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -133,37 +133,37 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); - ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); - client.init(APP_ID); - client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { - @Override - public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { - if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); + try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { + @Override + public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { + synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { + data.retain(); + res.successBlocks.add(blockId); + res.buffers.add(data); + requestsRemaining.release(); + } } } - } - - @Override - public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { - if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); + + @Override + public void onBlockFetchFailure(String blockId, Throwable exception) { + synchronized (this) { + if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { + res.failedBlocks.add(blockId); + requestsRemaining.release(); + } } } - } - }, null); + }, null); - if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); + if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { + fail("Timeout getting response from the server"); + } } - client.close(); return res; } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 16bad9f1b319..82caf392b821 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -96,14 +96,16 @@ private void validate(String appId, String secretKey, boolean encrypt) ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true"))); } - ExternalShuffleClient client = - new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000); - client.init(appId); - // Registration either succeeds or throws an exception. - client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", - new ExecutorShuffleInfo(new String[0], 0, - "org.apache.spark.shuffle.sort.SortShuffleManager")); - client.close(); + try (ExternalShuffleClient client = + new ExternalShuffleClient( + testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) { + client.init(appId); + // Registration either succeeds or throws an exception. + client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", + new ExecutorShuffleInfo( + new String[0], 0, "org.apache.spark.shuffle.sort.SortShuffleManager") + ); + } } /** Provides a secret key holder which always returns the given secret key, for a single appId. */ diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java index f7c22dddb8cc..06a248c9a27c 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java @@ -191,10 +191,9 @@ public static CountMinSketch readFrom(InputStream in) throws IOException { * Reads in a {@link CountMinSketch} from a byte array. */ public static CountMinSketch readFrom(byte[] bytes) throws IOException { - InputStream in = new ByteArrayInputStream(bytes); - CountMinSketch cms = readFrom(in); - in.close(); - return cms; + try (InputStream in = new ByteArrayInputStream(bytes)) { + return readFrom(in); + } } /** diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java index fd1906d2e5ae..b78c1677a121 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java @@ -322,10 +322,10 @@ public void writeTo(OutputStream out) throws IOException { @Override public byte[] toByteArray() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - writeTo(out); - out.close(); - return out.toByteArray(); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + writeTo(out); + return out.toByteArray(); + } } public static CountMinSketchImpl readFrom(InputStream in) throws IOException { diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 0cced9e22295..2e18715b600e 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -135,62 +135,58 @@ private void readAsync() throws IOException { } finally { stateChangeLock.unlock(); } - executorService.execute(new Runnable() { - - @Override - public void run() { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); + executorService.execute(() -> { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { - stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; - } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; } + } finally { + stateChangeLock.lock(); + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); } }); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 323a5d3c5283..b020a6d99247 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -152,9 +152,9 @@ public void write(Iterator> records) throws IOException { } for (int i = 0; i < numPartitions; i++) { - final DiskBlockObjectWriter writer = partitionWriters[i]; - partitionWriterSegments[i] = writer.commitAndGet(); - writer.close(); + try (DiskBlockObjectWriter writer = partitionWriters[i]) { + partitionWriterSegments[i] = writer.commitAndGet(); + } } File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index c7d2db4217d9..1c0d664afb13 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -181,42 +181,43 @@ private void writeSortedFile(boolean isLastFile) { // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; - final DiskBlockObjectWriter writer = - blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - int currentPartition = -1; - final int uaoSize = UnsafeAlignedOffset.getUaoSize(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final int partition = sortedRecords.packedRecordPointer.getPartitionId(); - assert (partition >= currentPartition); - if (partition != currentPartition) { - // Switch to the new partition - if (currentPartition != -1) { - final FileSegment fileSegment = writer.commitAndGet(); - spillInfo.partitionLengths[currentPartition] = fileSegment.length(); + final FileSegment committedSegment; + try (DiskBlockObjectWriter writer = + blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { + + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + while (sortedRecords.hasNext()) { + sortedRecords.loadNext(); + final int partition = sortedRecords.packedRecordPointer.getPartitionId(); + assert (partition >= currentPartition); + if (partition != currentPartition) { + // Switch to the new partition + if (currentPartition != -1) { + final FileSegment fileSegment = writer.commitAndGet(); + spillInfo.partitionLengths[currentPartition] = fileSegment.length(); + } + currentPartition = partition; } - currentPartition = partition; - } - final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); - final Object recordPage = taskMemoryManager.getPage(recordPointer); - final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length - while (dataRemaining > 0) { - final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); - Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); - writer.write(writeBuffer, 0, toTransfer); - recordReadPosition += toTransfer; - dataRemaining -= toTransfer; + final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); + final Object recordPage = taskMemoryManager.getPage(recordPointer); + final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); + int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); + long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length + while (dataRemaining > 0) { + final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); + Platform.copyMemory( + recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); + writer.write(writeBuffer, 0, toTransfer); + recordReadPosition += toTransfer; + dataRemaining -= toTransfer; + } + writer.recordWritten(); } - writer.recordWritten(); - } - final FileSegment committedSegment = writer.commitAndGet(); - writer.close(); + committedSegment = writer.commitAndGet(); + } // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. diff --git a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java index a6589d289814..40a7c9486ae5 100644 --- a/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java +++ b/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java @@ -39,30 +39,28 @@ public void setUp() throws ClassNotFoundException, SQLException { sc = new JavaSparkContext("local", "JavaAPISuite"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection connection = - DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); - try { - Statement create = connection.createStatement(); - create.execute( - "CREATE TABLE FOO(" + - "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + - "DATA INTEGER)"); - create.close(); + try (Connection connection = DriverManager.getConnection( + "jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) { + + try (Statement create = connection.createStatement()) { + create.execute( + "CREATE TABLE FOO(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY" + + " (START WITH 1, INCREMENT BY 1), DATA INTEGER)"); + } - PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); - for (int i = 1; i <= 100; i++) { - insert.setInt(1, i * 2); - insert.executeUpdate(); + try (PreparedStatement insert = connection.prepareStatement( + "INSERT INTO FOO(DATA) VALUES(?)")) { + for (int i = 1; i <= 100; i++) { + insert.setInt(1, i * 2); + insert.executeUpdate(); + } } - insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } - } finally { - connection.close(); } } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 0d5c5ea7903e..a07d0e84ea85 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -186,14 +186,14 @@ private List> readRecordsFromFile() throws IOException { if (conf.getBoolean("spark.shuffle.compress", true)) { in = CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(in); } - DeserializationStream recordsStream = serializer.newInstance().deserializeStream(in); - Iterator> records = recordsStream.asKeyValueIterator(); - while (records.hasNext()) { - Tuple2 record = records.next(); - assertEquals(i, hashPartitioner.getPartition(record._1())); - recordsList.add(record); + try (DeserializationStream recordsStream = serializer.newInstance().deserializeStream(in)) { + Iterator> records = recordsStream.asKeyValueIterator(); + while (records.hasNext()) { + Tuple2 record = records.next(); + assertEquals(i, hashPartitioner.getPartition(record._1())); + recordsList.add(record); + } } - recordsStream.close(); startOffset += partitionSize; } } diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 01b5fb7b4668..3992ab7049bd 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -997,10 +997,10 @@ public void binaryFiles() throws Exception { FileOutputStream fos1 = new FileOutputStream(file1); - FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - channel1.close(); + try (FileChannel channel1 = fos1.getChannel()) { + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); + } JavaPairRDD readRDD = sc.binaryFiles(tempDirName, 3); List> result = readRDD.collect(); for (Tuple2 res : result) { @@ -1018,10 +1018,10 @@ public void binaryFilesCaching() throws Exception { FileOutputStream fos1 = new FileOutputStream(file1); - FileChannel channel1 = fos1.getChannel(); - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); - channel1.close(); + try (FileChannel channel1 = fos1.getChannel()) { + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); + } JavaPairRDD readRDD = sc.binaryFiles(tempDirName).cache(); readRDD.foreach(pair -> pair._2().toArray()); // force the file to read @@ -1042,13 +1042,12 @@ public void binaryRecords() throws Exception { FileOutputStream fos1 = new FileOutputStream(file1); - FileChannel channel1 = fos1.getChannel(); - - for (int i = 0; i < numOfCopies; i++) { - ByteBuffer bbuf = ByteBuffer.wrap(content1); - channel1.write(bbuf); + try (FileChannel channel1 = fos1.getChannel()) { + for (int i = 0; i < numOfCopies; i++) { + ByteBuffer bbuf = ByteBuffer.wrap(content1); + channel1.write(bbuf); + } } - channel1.close(); JavaRDD readRDD = sc.binaryRecords(tempDirName, content1.length); assertEquals(numOfCopies,readRDD.count()); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java index 551443a11298..460513816dfd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.catalyst.expressions; +import java.io.Closeable; import java.io.IOException; import org.apache.spark.memory.MemoryConsumer; @@ -45,7 +46,7 @@ * page requires an average size for key value pairs to be larger than 1024 bytes. * */ -public abstract class RowBasedKeyValueBatch extends MemoryConsumer { +public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable { protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class); private static final int DEFAULT_CAPACITY = 1 << 16; diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java index 2da87113c622..8da778800bb9 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java @@ -123,9 +123,8 @@ public void tearDown() { @Test public void emptyBatch() throws Exception { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { Assert.assertEquals(0, batch.numRows()); try { batch.getKeyRow(-1); @@ -152,31 +151,24 @@ public void emptyBatch() throws Exception { // Expected exception; do nothing. } Assert.assertFalse(batch.rowIterator().next()); - } finally { - batch.close(); } } @Test - public void batchType() throws Exception { - RowBasedKeyValueBatch batch1 = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - RowBasedKeyValueBatch batch2 = RowBasedKeyValueBatch.allocate(fixedKeySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + public void batchType() { + try (RowBasedKeyValueBatch batch1 = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY); + RowBasedKeyValueBatch batch2 = RowBasedKeyValueBatch.allocate(fixedKeySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { Assert.assertEquals(batch1.getClass(), VariableLengthRowBasedKeyValueBatch.class); Assert.assertEquals(batch2.getClass(), FixedLengthRowBasedKeyValueBatch.class); - } finally { - batch1.close(); - batch2.close(); } } @Test public void setAndRetrieve() { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { UnsafeRow ret1 = appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); Assert.assertTrue(checkValue(ret1, 1, 1)); UnsafeRow ret2 = appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 2)); @@ -204,33 +196,27 @@ public void setAndRetrieve() { } catch (AssertionError e) { // Expected exception; do nothing. } - } finally { - batch.close(); } } @Test public void setUpdateAndRetrieve() { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); Assert.assertEquals(1, batch.numRows()); UnsafeRow retrievedValue = batch.getValueRow(0); updateValueRow(retrievedValue, 2, 2); UnsafeRow retrievedValue2 = batch.getValueRow(0); Assert.assertTrue(checkValue(retrievedValue2, 2, 2)); - } finally { - batch.close(); } } @Test public void iteratorTest() throws Exception { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { appendRow(batch, makeKeyRow(1, "A"), makeValueRow(1, 1)); appendRow(batch, makeKeyRow(2, "B"), makeValueRow(2, 2)); appendRow(batch, makeKeyRow(3, "C"), makeValueRow(3, 3)); @@ -253,16 +239,13 @@ public void iteratorTest() throws Exception { Assert.assertTrue(checkKey(key3, 3, "C")); Assert.assertTrue(checkValue(value3, 3, 3)); Assert.assertFalse(iterator.next()); - } finally { - batch.close(); } } @Test public void fixedLengthTest() throws Exception { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(fixedKeySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(fixedKeySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { appendRow(batch, makeKeyRow(11, 11), makeValueRow(1, 1)); appendRow(batch, makeKeyRow(22, 22), makeValueRow(2, 2)); appendRow(batch, makeKeyRow(33, 33), makeValueRow(3, 3)); @@ -293,16 +276,13 @@ public void fixedLengthTest() throws Exception { Assert.assertTrue(checkKey(key3, 33, 33)); Assert.assertTrue(checkValue(value3, 3, 3)); Assert.assertFalse(iterator.next()); - } finally { - batch.close(); } } @Test public void appendRowUntilExceedingCapacity() throws Exception { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, 10); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, 10)) { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(1, 1); for (int i = 0; i < 10; i++) { @@ -321,8 +301,6 @@ public void appendRowUntilExceedingCapacity() throws Exception { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); - } finally { - batch.close(); } } @@ -330,9 +308,8 @@ public void appendRowUntilExceedingCapacity() throws Exception { public void appendRowUntilExceedingPageSize() throws Exception { // Use default size or spark.buffer.pageSize if specified int pageSizeToUse = (int) memoryManager.pageSizeBytes(); - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, pageSizeToUse)) { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(1, 1); int recordLength = 8 + key.getSizeInBytes() + value.getSizeInBytes() + 8; @@ -356,49 +333,44 @@ public void appendRowUntilExceedingPageSize() throws Exception { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); - } finally { - batch.close(); } } @Test public void failureToAllocateFirstPage() throws Exception { memoryManager.limit(1024); - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(11, 11); UnsafeRow ret = appendRow(batch, key, value); Assert.assertNull(ret); Assert.assertFalse(batch.rowIterator().next()); - } finally { - batch.close(); } } @Test public void randomizedTest() { - RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, - valueSchema, taskMemoryManager, DEFAULT_CAPACITY); - int numEntry = 100; - long[] expectedK1 = new long[numEntry]; - String[] expectedK2 = new String[numEntry]; - long[] expectedV1 = new long[numEntry]; - long[] expectedV2 = new long[numEntry]; - - for (int i = 0; i < numEntry; i++) { - long k1 = rand.nextLong(); - String k2 = getRandomString(rand.nextInt(256)); - long v1 = rand.nextLong(); - long v2 = rand.nextLong(); - appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); - expectedK1[i] = k1; - expectedK2[i] = k2; - expectedV1[i] = v1; - expectedV2[i] = v2; - } - try { + try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, + valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { + int numEntry = 100; + long[] expectedK1 = new long[numEntry]; + String[] expectedK2 = new String[numEntry]; + long[] expectedV1 = new long[numEntry]; + long[] expectedV2 = new long[numEntry]; + + for (int i = 0; i < numEntry; i++) { + long k1 = rand.nextLong(); + String k2 = getRandomString(rand.nextInt(256)); + long v1 = rand.nextLong(); + long v2 = rand.nextLong(); + appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); + expectedK1[i] = k1; + expectedK2[i] = k2; + expectedV1[i] = v1; + expectedV2[i] = v2; + } + for (int j = 0; j < 10000; j++) { int rowId = rand.nextInt(numEntry); if (rand.nextBoolean()) { @@ -410,8 +382,6 @@ public void randomizedTest() { Assert.assertTrue(checkValue(value, expectedV1[rowId], expectedV2[rowId])); } } - } finally { - batch.close(); } } }