From 00bcf8a893c021fa4a949c5ac077a34881870ace Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 11:55:11 -0700 Subject: [PATCH 1/8] Avoid IO operations on empty files in BlockObjectWriter. --- .../apache/spark/storage/BlockObjectWriter.scala | 13 ++++++------- .../spark/util/collection/ExternalSorter.scala | 8 +++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index a33f22ef52687..3a353d2eae886 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -184,13 +184,12 @@ private[spark] class DiskBlockObjectWriter( objOut.flush() bs.flush() close() - } - - val truncateStream = new FileOutputStream(file, true) - try { - truncateStream.getChannel.truncate(initialPosition) - } finally { - truncateStream.close() + val truncateStream = new FileOutputStream(file, true) + try { + truncateStream.getChannel.truncate(initialPosition) + } finally { + truncateStream.close() + } } } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 3b9d14f9372b6..1caf0734c2558 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -359,9 +359,11 @@ private[spark] class ExternalSorter[K, V, C]( // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() - val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, - curWriteMetrics) - writer.open() + // We purposely don't call open() on the disk writer in order to avoid writing compression + // headers into empty files, but we still need to create the file because the read code + // expects it to exist: + file.createNewFile() + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, curWriteMetrics) } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be From 8fd89b47efbef6325e0bc45bad0b74bf8ead4a6d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 12:10:46 -0700 Subject: [PATCH 2/8] Do not create empty files at all. --- .../util/collection/ExternalSorter.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 1caf0734c2558..ba589ed26e17e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -359,10 +359,6 @@ private[spark] class ExternalSorter[K, V, C]( // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() - // We purposely don't call open() on the disk writer in order to avoid writing compression - // headers into empty files, but we still need to create the file because the read code - // expects it to exist: - file.createNewFile() blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, curWriteMetrics) } // Creating the file to write to and creating a disk writer both involve interacting with @@ -735,11 +731,16 @@ private[spark] class ExternalSorter[K, V, C]( val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { - val in = new FileInputStream(partitionWriters(i).fileSegment().file) - util.Utils.tryWithSafeFinally { - lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) - } { - in.close() + val file = partitionWriters(i).fileSegment().file + if (!file.exists()) { + lengths(i) = 0 + } else { + val in = new FileInputStream(file) + util.Utils.tryWithSafeFinally { + lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) + } { + in.close() + } } } } { From 0db87c341686e7b24e760583bcc9fe9054d3095a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 13:30:00 -0700 Subject: [PATCH 3/8] Reduce scope of FileOutputStream in ExternalSorter --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index ba589ed26e17e..2603adf01c4d5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -727,7 +727,6 @@ private[spark] class ExternalSorter[K, V, C]( // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) - val out = new FileOutputStream(outputFile, true) val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { @@ -735,16 +734,17 @@ private[spark] class ExternalSorter[K, V, C]( if (!file.exists()) { lengths(i) = 0 } else { + val out = new FileOutputStream(outputFile, true) val in = new FileInputStream(file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() + out.close() } } } } { - out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } From 7e2340d05721d6374e78069baa5870e87cd0cfb1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Apr 2015 17:45:45 -0700 Subject: [PATCH 4/8] Revert "Reduce scope of FileOutputStream in ExternalSorter" This reverts commit 3c9c9447d4d4e8ddeb036167390073e3b67fb621. --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2603adf01c4d5..ba589ed26e17e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -727,6 +727,7 @@ private[spark] class ExternalSorter[K, V, C]( // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) + val out = new FileOutputStream(outputFile, true) val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { @@ -734,17 +735,16 @@ private[spark] class ExternalSorter[K, V, C]( if (!file.exists()) { lengths(i) = 0 } else { - val out = new FileOutputStream(outputFile, true) val in = new FileInputStream(file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() - out.close() } } } } { + out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } From 5c777cf40ee1f70092639a8abfe8b9598d6d3636 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Jun 2015 14:35:53 -0700 Subject: [PATCH 5/8] Rework SPARK-7041 for BypassMergeSort split --- .../sort/BypassMergeSortShuffleWriter.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d3d6280284beb..a921ce64ad75a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -107,6 +107,14 @@ public void insertAll(Iterator> records) throws IOException { blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); + // Note that we purposely do not call open() on the disk writers here; DiskBlockObjectWriter + // will automatically open() itself if necessary. This is an optimization to avoid file + // creation and truncation for empty partitions; this optimization probably doesn't make sense + // for most realistic production workloads, but it can make a large difference when playing + // around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over + // an extremely small number of records then Spark SQL's default parallelism of 200 would + // result in slower out-of-the-box performance due to these constant-factor overheads. This + // optimization speeds up local microbenchmarking and SQL unit tests. partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); } @@ -143,6 +151,13 @@ public long[] writePartitionedFile( boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { + if (partitionWriters[i].fileSegment().length() == 0) { + // In insertAll(), we didn't create empty files for empty reduce partitions; this branch + // handles that case. Since we'll be skipping deletion of these files, verify that they + // don't exist: + assert(!partitionWriters[i].fileSegment().file().exists()); + continue; + } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); boolean copyThrewException = true; try { @@ -172,7 +187,8 @@ public void stop() throws IOException { for (BlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: writer.revertPartialWritesAndClose(); - if (!diskBlockManager.getFile(writer.blockId()).delete()) { + final File file = diskBlockManager.getFile(writer.blockId()); + if (file.exists() && !file.delete()) { logger.error("Error while deleting file for block {}", writer.blockId()); } } From aaa51bf58f286f0c1dbb0a38afb514e7a38b1183 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 9 Jun 2015 10:50:35 -0700 Subject: [PATCH 6/8] Actually avoid calling open() :) --- .../apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a921ce64ad75a..ae46ba96ca295 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -116,7 +116,7 @@ public void insertAll(Iterator> records) throws IOException { // result in slower out-of-the-box performance due to these constant-factor overheads. This // optimization speeds up local microbenchmarking and SQL unit tests. partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be From 618ac7e8b810fe6f9ebd7835a2e28e181003d824 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 23 Nov 2015 14:52:58 -0800 Subject: [PATCH 7/8] Still create empty file, for simplicity --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 587ec77a0ff13..f379a5fb93018 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -145,6 +145,8 @@ public void write(Iterator> records) throws IOException { // an extremely small number of records then Spark SQL's default parallelism of 200 would // result in slower out-of-the-box performance due to these constant-factor overheads. This // optimization speeds up local microbenchmarking and SQL unit tests. + // However, we still create the file because the read code expects it to exist: + file.createNewFile(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } @@ -193,13 +195,6 @@ private long[] writePartitionedFile(File outputFile) throws IOException { boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { - if (partitionWriters[i].fileSegment().length() == 0) { - // In insertAll(), we didn't create empty files for empty reduce partitions; this branch - // handles that case. Since we'll be skipping deletion of these files, verify that they - // don't exist: - assert(!partitionWriters[i].fileSegment().file().exists()); - continue; - } final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); boolean copyThrewException = true; try { From 5fdfcda9a819c54ae469f6d6edf66247f8207bf1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 23 Nov 2015 17:18:22 -0800 Subject: [PATCH 8/8] Update HiveCompatibilitySuite.scala --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2d0d7b8af3581..71ac720e29b76 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.tags.ExtendedHiveTest +// Test comment to trigger Hive tests. + /** * Runs the test cases that are included in the hive distribution. */