From cc6d98a17f6fa4249951802f981c2224d354e651 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Tue, 5 Apr 2016 13:36:34 -0700 Subject: [PATCH 1/6] [SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD. This patch updates FileScanRDD to start reading from the next file while the current file is being processed. The goal is to have better interleaving of CPU and IO. It does this by launching a future which will asynchronously start preparing the next file to be read. The expectation is that the async task is IO intensive and the current file (which includes all the computation for the query plan) is CPU intensive. For some file formats, this would just mean opening the file and the initial setup. For file formats like parquet, this would mean doing all the IO for all the columns. --- .../execution/datasources/FileScanRDD.scala | 70 ++++++++++++++++--- .../apache/spark/sql/internal/SQLConf.scala | 9 ++- .../datasources/parquet/ParquetIOSuite.scala | 10 ++- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 988c785dbe61..25cb26237153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.execution.datasources +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration + import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.ThreadUtils /** * A single file that should be read, along with partition column values that @@ -46,37 +50,87 @@ case class PartitionedFile( */ case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition +object FileScanRDD { + private val ioExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16)) +} + class FileScanRDD( @transient val sqlContext: SQLContext, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sqlContext.sparkContext, Nil) { + /** + * To get better interleaving of CPU and IO, this RDD will create a future to prepare the next + * file while the current one is being processed. `currentIterator` is the current file and + * `nextFile` is the future that will initialize the next file to be read. This includes things + * such as starting up connections to open the file and any initial buffering. The expectation + * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive. + */ + val asyncIO = sqlContext.conf.filesAsyncIO + + case class NextFile(file: PartitionedFile, iter: Iterator[Object]) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + // TODO: do we need to close this? private[this] var currentIterator: Iterator[Object] = null + private[this] var nextFile: Future[NextFile] = if (asyncIO) prepareNextFile() else null + def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() def next() = currentIterator.next() /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - if (files.hasNext) { - val nextFile = files.next() - logInfo(s"Reading File $nextFile") - SqlNewHadoopRDDState.setInputFileName(nextFile.filePath) - currentIterator = readFunction(nextFile) - hasNext + if (asyncIO) { + if (nextFile == null) return false + } else { + if (!files.hasNext) return false + } + + // Wait for the async task to complete + val file = if (asyncIO) { + Await.result(nextFile, Duration.Inf) + } else { + val f = files.next() + val it = readFunction(f) + NextFile(f, it) + } + + // This is only used to evaluate the rest of the execution so we can safely set it here. + SqlNewHadoopRDDState.setInputFileName(file.file.filePath) + currentIterator = file.iter + + if (asyncIO && files.hasNext) { + // Asynchronously start the next file. + nextFile = prepareNextFile() } else { - SqlNewHadoopRDDState.unsetInputFileName() - false + nextFile = null } + + hasNext } override def close() = { SqlNewHadoopRDDState.unsetInputFileName() } + + def prepareNextFile() = { + Future { + if (files.hasNext) { + val file = files.next() + val it = readFunction(file) + // Read something from the file to trigger some initial IO. + it.hasNext + NextFile(file, it) + } else { + null + } + }(FileScanRDD.ioExecutionContext) + } } // Register an on-task-completion callback to close the input stream. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc6ba1bcfb6d..fcc251a98d92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -417,6 +416,12 @@ object SQLConf { .longConf .createWithDefault(4 * 1024 * 1024) + val FILES_ASYNC_IO = SQLConfigBuilder("spark.sql.files.asyncIO") + .internal() + .doc("If true, attempts to asynchronously do IO when reading data.") + .booleanConf + .createWithDefault(true) + val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse") .internal() .doc("When true, the planner will try to find out duplicated exchanges and re-use them.") @@ -479,6 +484,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) + def filesAsyncIO: Boolean = getConf(FILES_ASYNC_IO) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 581095d3dc1c..a8d898bdf426 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -78,10 +78,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("basic data types (without binary)") { - val data = (1 to 4).map { i => - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + (true :: false :: Nil).foreach { v => + withSQLConf(SQLConf.FILES_ASYNC_IO.key -> v.toString) { + val data = (1 to 4).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + } + checkParquetFile(data) + } } - checkParquetFile(data) } test("raw binary") { From bc11dd580a751b2e39694223ecbf1fa2b4a7bdc0 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 7 Apr 2016 16:26:50 -0700 Subject: [PATCH 2/6] Simplify and fix tests. --- .../execution/datasources/FileScanRDD.scala | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 25cb26237153..e06fcd64e57e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -85,30 +85,23 @@ class FileScanRDD( /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - if (asyncIO) { - if (nextFile == null) return false - } else { - if (!files.hasNext) return false - } - - // Wait for the async task to complete val file = if (asyncIO) { + if (nextFile == null) return false + // Wait for the async task to complete Await.result(nextFile, Duration.Inf) } else { + if (!files.hasNext) return false val f = files.next() - val it = readFunction(f) - NextFile(f, it) + NextFile(f, readFunction(f)) } // This is only used to evaluate the rest of the execution so we can safely set it here. SqlNewHadoopRDDState.setInputFileName(file.file.filePath) currentIterator = file.iter - if (asyncIO && files.hasNext) { + if (asyncIO) { // Asynchronously start the next file. nextFile = prepareNextFile() - } else { - nextFile = null } hasNext @@ -119,17 +112,17 @@ class FileScanRDD( } def prepareNextFile() = { - Future { - if (files.hasNext) { + if (files.hasNext) { + Future { val file = files.next() val it = readFunction(file) // Read something from the file to trigger some initial IO. it.hasNext NextFile(file, it) - } else { - null - } - }(FileScanRDD.ioExecutionContext) + }(FileScanRDD.ioExecutionContext) + } else { + null + } } } From 8aebf9427e6630046ae297b38f964d0809c3d348 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Mon, 25 Apr 2016 12:07:39 -0700 Subject: [PATCH 3/6] restructure --- .../spark/sql/execution/datasources/FileScanRDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 7f068f39082f..7c62d29e6206 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.datasources -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import org.apache.spark.{Partition => RDDPartition, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil @@ -148,7 +148,7 @@ class FileScanRDD( val file = if (asyncIO) { if (nextFile == null) return false // Wait for the async task to complete - Await.result(nextFile, Duration.Inf) + ThreadUtils.awaitResult(nextFile, Duration.Inf) } else { if (!files.hasNext) return false val f = files.next() @@ -156,7 +156,7 @@ class FileScanRDD( } // This is only used to evaluate the rest of the execution so we can safely set it here. - SqlNewHadoopRDDState.setInputFileName(file.file.filePath) + InputFileNameHolder.setInputFileName(file.file.filePath) currentIterator = file.iter if (asyncIO) { From 8799cc873900cf9e4c37012e7a6d607eeabfbdd5 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Mon, 25 Apr 2016 13:08:47 -0700 Subject: [PATCH 4/6] add nextIterator --- .../execution/datasources/FileScanRDD.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 7c62d29e6206..e06cb6457762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -71,7 +71,7 @@ class FileScanRDD( * such as starting up connections to open the file and any initial buffering. The expectation * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive. */ - val asyncIO = sqlContext.conf.filesAsyncIO + val isAsyncIOEnabled = sqlContext.conf.filesAsyncIO case class NextFile(file: PartitionedFile, iter: Iterator[Object]) @@ -105,11 +105,10 @@ class FileScanRDD( private[this] val files = split.asInstanceOf[FilePartition].files.toIterator private[this] var currentFile: PartitionedFile = null - - // TODO: do we need to close this? private[this] var currentIterator: Iterator[Object] = null - private[this] var nextFile: Future[NextFile] = if (asyncIO) prepareNextFile() else null + private[this] var nextFile: Future[NextFile] = + if (isAsyncIOEnabled) prepareNextFile() else null def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() def next() = { @@ -145,10 +144,13 @@ class FileScanRDD( /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator2(): Boolean = { - val file = if (asyncIO) { - if (nextFile == null) return false - // Wait for the async task to complete - ThreadUtils.awaitResult(nextFile, Duration.Inf) + val file = if (isAsyncIOEnabled) { + if (nextFile == null) { + return false + } else { + // Wait for the async task to complete + ThreadUtils.awaitResult(nextFile, Duration.Inf) + } } else { if (!files.hasNext) return false val f = files.next() @@ -159,7 +161,7 @@ class FileScanRDD( InputFileNameHolder.setInputFileName(file.file.filePath) currentIterator = file.iter - if (asyncIO) { + if (isAsyncIOEnabled) { // Asynchronously start the next file. nextFile = prepareNextFile() } @@ -173,14 +175,14 @@ class FileScanRDD( InputFileNameHolder.unsetInputFileName() } - def prepareNextFile() = { + def prepareNextFile(): Future[NextFile] = { if (files.hasNext) { Future { - val file = files.next() - val it = readFunction(file) + val nextFile = files.next() + val nextFileIter = readFunction(nextFile) // Read something from the file to trigger some initial IO. - it.hasNext - NextFile(file, it) + nextFileIter.hasNext + NextFile(nextFile, nextFileIter) }(FileScanRDD.ioExecutionContext) } else { null From f3a21672e94cdb67e2cb69d60af327cff0b2cf54 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Mon, 25 Apr 2016 13:36:20 -0700 Subject: [PATCH 5/6] cleanup --- .../execution/datasources/FileScanRDD.scala | 57 ++++++++----------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index e06cb6457762..722157c943c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -129,44 +129,33 @@ class FileScanRDD( /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { updateBytesReadWithFileSize() - if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") - InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) - hasNext - } else { - currentFile = null - InputFileNameHolder.unsetInputFileName() - false - } - } - - /** Advances to the next file. Returns true if a new non-empty iterator is available. */ - private def nextIterator2(): Boolean = { - val file = if (isAsyncIOEnabled) { - if (nextFile == null) { - return false - } else { + if (isAsyncIOEnabled) { + if (nextFile != null) { // Wait for the async task to complete - ThreadUtils.awaitResult(nextFile, Duration.Inf) + val file = ThreadUtils.awaitResult(nextFile, Duration.Inf) + InputFileNameHolder.setInputFileName(file.file.filePath) + currentIterator = file.iter + // Asynchronously start the next file. + nextFile = prepareNextFile() + hasNext + } else { + currentFile = null + InputFileNameHolder.unsetInputFileName() + false } } else { - if (!files.hasNext) return false - val f = files.next() - NextFile(f, readFunction(f)) - } - - // This is only used to evaluate the rest of the execution so we can safely set it here. - InputFileNameHolder.setInputFileName(file.file.filePath) - currentIterator = file.iter - - if (isAsyncIOEnabled) { - // Asynchronously start the next file. - nextFile = prepareNextFile() + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + InputFileNameHolder.setInputFileName(currentFile.filePath) + currentIterator = readFunction(currentFile) + hasNext + } else { + currentFile = null + InputFileNameHolder.unsetInputFileName() + false + } } - - hasNext } override def close() = { From 796d5ebf5d0b24a9e5f49bac4d0661ef833f1f78 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 27 Apr 2016 16:12:44 -0700 Subject: [PATCH 6/6] fix conf --- .../apache/spark/sql/execution/datasources/FileScanRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 9f7cd6a3825a..b015251b1dc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -71,7 +71,7 @@ class FileScanRDD( * such as starting up connections to open the file and any initial buffering. The expectation * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive. */ - val isAsyncIOEnabled = sqlContext.conf.filesAsyncIO + val isAsyncIOEnabled = sparkSession.sessionState.conf.filesAsyncIO case class NextFile(file: PartitionedFile, iter: Iterator[Object])