From cc6d98a17f6fa4249951802f981c2224d354e651 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 5 Apr 2016 13:36:34 -0700 Subject: [PATCH 1/2] [SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD. This patch updates FileScanRDD to start reading from the next file while the current file is being processed. The goal is to have better interleaving of CPU and IO. It does this by launching a future which will asynchronously start preparing the next file to be read. The expectation is that the async task is IO intensive and the current file (which includes all the computation for the query plan) is CPU intensive. For some file formats, this would just mean opening the file and the initial setup. For file formats like parquet, this would mean doing all the IO for all the columns. --- .../execution/datasources/FileScanRDD.scala | 70 ++++++++++++++++--- .../apache/spark/sql/internal/SQLConf.scala | 9 ++- .../datasources/parquet/ParquetIOSuite.scala | 10 ++- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 988c785dbe61..25cb26237153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.execution.datasources +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration + import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.ThreadUtils /** * A single file that should be read, along with partition column values that @@ -46,37 +50,87 @@ case class PartitionedFile( */ case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition +object FileScanRDD { + private val ioExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16)) +} + class FileScanRDD( @transient val sqlContext: SQLContext, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sqlContext.sparkContext, Nil) { + /** + * To get better interleaving of CPU and IO, this RDD will create a future to prepare the next + * file while the current one is being processed. `currentIterator` is the current file and + * `nextFile` is the future that will initialize the next file to be read. This includes things + * such as starting up connections to open the file and any initial buffering. The expectation + * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive. + */ + val asyncIO = sqlContext.conf.filesAsyncIO + + case class NextFile(file: PartitionedFile, iter: Iterator[Object]) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + // TODO: do we need to close this? private[this] var currentIterator: Iterator[Object] = null + private[this] var nextFile: Future[NextFile] = if (asyncIO) prepareNextFile() else null + def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() def next() = currentIterator.next() /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - if (files.hasNext) { - val nextFile = files.next() - logInfo(s"Reading File $nextFile") - SqlNewHadoopRDDState.setInputFileName(nextFile.filePath) - currentIterator = readFunction(nextFile) - hasNext + if (asyncIO) { + if (nextFile == null) return false + } else { + if (!files.hasNext) return false + } + + // Wait for the async task to complete + val file = if (asyncIO) { + Await.result(nextFile, Duration.Inf) + } else { + val f = files.next() + val it = readFunction(f) + NextFile(f, it) + } + + // This is only used to evaluate the rest of the execution so we can safely set it here. + SqlNewHadoopRDDState.setInputFileName(file.file.filePath) + currentIterator = file.iter + + if (asyncIO && files.hasNext) { + // Asynchronously start the next file. + nextFile = prepareNextFile() } else { - SqlNewHadoopRDDState.unsetInputFileName() - false + nextFile = null } + + hasNext } override def close() = { SqlNewHadoopRDDState.unsetInputFileName() } + + def prepareNextFile() = { + Future { + if (files.hasNext) { + val file = files.next() + val it = readFunction(file) + // Read something from the file to trigger some initial IO. + it.hasNext + NextFile(file, it) + } else { + null + } + }(FileScanRDD.ioExecutionContext) + } } // Register an on-task-completion callback to close the input stream. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc6ba1bcfb6d..fcc251a98d92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -417,6 +416,12 @@ object SQLConf { .longConf .createWithDefault(4 * 1024 * 1024) + val FILES_ASYNC_IO = SQLConfigBuilder("spark.sql.files.asyncIO") + .internal() + .doc("If true, attempts to asynchronously do IO when reading data.") + .booleanConf + .createWithDefault(true) + val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse") .internal() .doc("When true, the planner will try to find out duplicated exchanges and re-use them.") @@ -479,6 +484,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) + def filesAsyncIO: Boolean = getConf(FILES_ASYNC_IO) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 581095d3dc1c..a8d898bdf426 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -78,10 +78,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("basic data types (without binary)") { - val data = (1 to 4).map { i => - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + (true :: false :: Nil).foreach { v => + withSQLConf(SQLConf.FILES_ASYNC_IO.key -> v.toString) { + val data = (1 to 4).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + } + checkParquetFile(data) + } } - checkParquetFile(data) } test("raw binary") { From bc11dd580a751b2e39694223ecbf1fa2b4a7bdc0 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 7 Apr 2016 16:26:50 -0700 Subject: [PATCH 2/2] Simplify and fix tests. --- .../execution/datasources/FileScanRDD.scala | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 25cb26237153..e06fcd64e57e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -85,30 +85,23 @@ class FileScanRDD( /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - if (asyncIO) { - if (nextFile == null) return false - } else { - if (!files.hasNext) return false - } - - // Wait for the async task to complete val file = if (asyncIO) { + if (nextFile == null) return false + // Wait for the async task to complete Await.result(nextFile, Duration.Inf) } else { + if (!files.hasNext) return false val f = files.next() - val it = readFunction(f) - NextFile(f, it) + NextFile(f, readFunction(f)) } // This is only used to evaluate the rest of the execution so we can safely set it here. SqlNewHadoopRDDState.setInputFileName(file.file.filePath) currentIterator = file.iter - if (asyncIO && files.hasNext) { + if (asyncIO) { // Asynchronously start the next file. nextFile = prepareNextFile() - } else { - nextFile = null } hasNext @@ -119,17 +112,17 @@ class FileScanRDD( } def prepareNextFile() = { - Future { - if (files.hasNext) { + if (files.hasNext) { + Future { val file = files.next() val it = readFunction(file) // Read something from the file to trigger some initial IO. it.hasNext NextFile(file, it) - } else { - null - } - }(FileScanRDD.ioExecutionContext) + }(FileScanRDD.ioExecutionContext) + } else { + null + } } }