Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.spark.sql.execution.datasources

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.ThreadUtils

/**
* A single file that should be read, along with partition column values that
Expand All @@ -46,37 +50,80 @@ case class PartitionedFile(
*/
case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition

object FileScanRDD {
private val ioExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set this to the total number of task slots on the executors, shouldn't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the total number of cores the user is willing to dedicate to a single Job? This looks to be similar to an issue in ParquetRelation where a parallelize call can end up tying up all of the cores (defaultParallelism) on a single Job. While this PR should allow better progress to be made during that kind of blocking, I'm thinking that what we really need is to implement what was suggested a while ago in the scheduling pools: a max cores limit in addition to the current min cores. With that in place and the max cores value exposed to these large IO operations, users who care about not blocking concurrent Jobs can use pools that neither consume all the available cores nor oversubscribe the cores that the pool does have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's difficult to model this as the total number of cores because what this is intended to do is background IO and use very little CPU. The async io will still use some CPU resources but expected to be very low, a small fraction of a core.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose 16? Why not 8? Why not 32?
Would it be better to leave decision points in a comment?

}

class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {

/**
* To get better interleaving of CPU and IO, this RDD will create a future to prepare the next
* file while the current one is being processed. `currentIterator` is the current file and
* `nextFile` is the future that will initialize the next file to be read. This includes things
* such as starting up connections to open the file and any initial buffering. The expectation
* is that `currentIterator` is CPU intensive and `nextFile` is IO intensive.
*/
val asyncIO = sqlContext.conf.filesAsyncIO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark asyncIO and NextFile as private since them seem like implementation details we might not want to expose?


case class NextFile(file: PartitionedFile, iter: Iterator[Object])

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
// TODO: do we need to close this?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this todo mean?

private[this] var currentIterator: Iterator[Object] = null

private[this] var nextFile: Future[NextFile] = if (asyncIO) prepareNextFile() else null

def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
def next() = currentIterator.next()

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
val file = if (asyncIO) {
if (nextFile == null) return false
// Wait for the async task to complete
Await.result(nextFile, Duration.Inf)
} else {
SqlNewHadoopRDDState.unsetInputFileName()
false
if (!files.hasNext) return false
val f = files.next()
NextFile(f, readFunction(f))
}

// This is only used to evaluate the rest of the execution so we can safely set it here.
SqlNewHadoopRDDState.setInputFileName(file.file.filePath)
currentIterator = file.iter

if (asyncIO) {
// Asynchronously start the next file.
nextFile = prepareNextFile()
}

hasNext
}

override def close() = {
SqlNewHadoopRDDState.unsetInputFileName()
}

def prepareNextFile() = {
if (files.hasNext) {
Future {
val file = files.next()
val it = readFunction(file)
// Read something from the file to trigger some initial IO.
it.hasNext
NextFile(file, it)
}(FileScanRDD.ioExecutionContext)
} else {
null
}
}
}

// Register an on-task-completion callback to close the input stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down Expand Up @@ -417,6 +416,12 @@ object SQLConf {
.longConf
.createWithDefault(4 * 1024 * 1024)

val FILES_ASYNC_IO = SQLConfigBuilder("spark.sql.files.asyncIO")
.internal()
.doc("If true, attempts to asynchronously do IO when reading data.")
.booleanConf
.createWithDefault(true)

val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
.internal()
.doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
Expand Down Expand Up @@ -479,6 +484,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)

def filesAsyncIO: Boolean = getConf(FILES_ASYNC_IO)

def useCompression: Boolean = getConf(COMPRESS_CACHED)

def useFileScan: Boolean = getConf(USE_FILE_SCAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}

test("basic data types (without binary)") {
val data = (1 to 4).map { i =>
(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
(true :: false :: Nil).foreach { v =>
withSQLConf(SQLConf.FILES_ASYNC_IO.key -> v.toString) {
val data = (1 to 4).map { i =>
(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
}
checkParquetFile(data)
}
}
checkParquetFile(data)
}

test("raw binary") {
Expand Down