Skip to content

Conversation

@nongli
Copy link
Contributor

@nongli nongli commented Apr 7, 2016

What changes were proposed in this pull request?

This patch updates FileScanRDD to start reading from the next file while the current file
is being processed. The goal is to have better interleaving of CPU and IO. It does this
by launching a future which will asynchronously start preparing the next file to be read.
The expectation is that the async task is IO intensive and the current file (which
includes all the computation for the query plan) is CPU intensive. For some file formats,
this would just mean opening the file and the initial setup. For file formats like
parquet, this would mean doing all the IO for all the columns.

How was this patch tested?

Good coverage from existing tests. Added a new one to test the flag. Cluster testing on tpcds queries.

This patch updates FileScanRDD to start reading from the next file while the current file
is being processed. The goal is to have better interleaving of CPU and IO. It does this
by launching a future which will asynchronously start preparing the next file to be read.
The expectation is that the async task is IO intensive and the current file (which
includes all the computation for the query plan) is CPU intensive. For some file formats,
this would just mean opening the file and the initial setup. For file formats like
parquet, this would mean doing all the IO for all the columns.
} else {
SqlNewHadoopRDDState.unsetInputFileName()
false
nextFile = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are going to keep setting nextFile to null every nextIterator call if asyncIO is false, could we change this to:
if (asyncIO) { if (files.hasNext) { nextFile = prepareNextFile() } else { nextFile = null } }

@holdenk
Copy link
Contributor

holdenk commented Apr 7, 2016

This is just a question, but would it be simpler if when we were in nonAsync IO we just set the future to be a completed value - that way the code is a bit simpler (or would this be more complicated)?

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55246 has finished for PR 12243 at commit cc6d98a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class NextFile(file: PartitionedFile, iter: Iterator[Object])

@nongli
Copy link
Contributor Author

nongli commented Apr 7, 2016

@holdenk I tried to simplify the logic. Let me know your thoughts.

@SparkQA
Copy link

SparkQA commented Apr 8, 2016

Test build #55268 has finished for PR 12243 at commit bc11dd5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* such as starting up connections to open the file and any initial buffering. The expectation
* is that `currentIterator` is CPU intensive and `nextFile` is IO intensive.
*/
val asyncIO = sqlContext.conf.filesAsyncIO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark asyncIO and NextFile as private since them seem like implementation details we might not want to expose?

@holdenk
Copy link
Contributor

holdenk commented Apr 8, 2016

Since this is for a performance improvement, do we have any benchmarks that show this helps?


object FileScanRDD {
private val ioExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set this to the total number of task slots on the executors, shouldn't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be the total number of cores the user is willing to dedicate to a single Job? This looks to be similar to an issue in ParquetRelation where a parallelize call can end up tying up all of the cores (defaultParallelism) on a single Job. While this PR should allow better progress to be made during that kind of blocking, I'm thinking that what we really need is to implement what was suggested a while ago in the scheduling pools: a max cores limit in addition to the current min cores. With that in place and the max cores value exposed to these large IO operations, users who care about not blocking concurrent Jobs can use pools that neither consume all the available cores nor oversubscribe the cores that the pool does have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's difficult to model this as the total number of cores because what this is intended to do is background IO and use very little CPU. The async io will still use some CPU resources but expected to be very low, a small fraction of a core.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you choose 16? Why not 8? Why not 32?
Would it be better to leave decision points in a comment?

@HyukjinKwon
Copy link
Member

Hi @nongli, I just happened to look at this PR. It seems it has been inactive for few months without answering to review comments. Would this be better closed for now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants