diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a1a1fb01426a0..f379a5fb93018 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -137,8 +137,18 @@ public void write(Iterator> records) throws IOException { blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); + // Note that we purposely do not call open() on the disk writers here; DiskBlockObjectWriter + // will automatically open() itself if necessary. This is an optimization to avoid file + // creation and truncation for empty partitions; this optimization probably doesn't make sense + // for most realistic production workloads, but it can make a large difference when playing + // around with Spark SQL queries in spark-shell on toy datasets: if you performed a query over + // an extremely small number of records then Spark SQL's default parallelism of 200 would + // result in slower out-of-the-box performance due to these constant-factor overheads. This + // optimization speeds up local microbenchmarking and SQL unit tests. + // However, we still create the file because the read code expects it to exist: + file.createNewFile(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index e2dd80f243930..fc8c12ce7e99f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -157,14 +157,16 @@ private[spark] class DiskBlockObjectWriter( objOut.flush() bs.flush() close() - } - val truncateStream = new FileOutputStream(file, true) - try { - truncateStream.getChannel.truncate(initialPosition) + val truncateStream = new FileOutputStream(file, true) + try { + truncateStream.getChannel.truncate(initialPosition) + file + } finally { + truncateStream.close() + } + } else { file - } finally { - truncateStream.close() } } catch { case e: Exception => diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2d0d7b8af3581..71ac720e29b76 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.tags.ExtendedHiveTest +// Test comment to trigger Hive tests. + /** * Runs the test cases that are included in the hive distribution. */