Skip to content

Conversation

@cozos
Copy link
Contributor

@cozos cozos commented Sep 23, 2019

What changes were proposed in this pull request?

See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

  • have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
  • add parallel execution to the glob and existence checks

Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

Does this PR introduce any user-facing change?

No

How was this patch tested?

I added a test suite DataSourceSuite - open to suggestions for better naming.

See here and here for some measurements

@wangyum
Copy link
Member

wangyum commented Sep 23, 2019

@cozos Do you have some benchmark number?

@wangyum
Copy link
Member

wangyum commented Sep 23, 2019

ok to test

@cozos
Copy link
Contributor Author

cozos commented Sep 23, 2019

@wangyum Not yet, I will look into running a comparison on the S3 Landsat dataset. Not sure what I'm gonna do about HDFS or any other filesystems.

@cozos
Copy link
Contributor Author

cozos commented Sep 23, 2019

Here are some measurements on S3 Landsat files, from my laptop in San Francisco to us-west-2 S3 bucket.

recursive list of s3a://landsat-pds/L8/001/003/, 1206 non globbed paths
Before improvement 57.6 seconds
After improvement 9.75 seconds (roughly 6x improvement)

s3a://landsat-pds/L8/001/003/*/*, a glob path that results in 1206 paths
Before improvement 10.75 seconds
After improvement 10.44 seconds (about the same duration due to single threaded glob)

30 glob paths (subdirectories of s3a://landsat-pds/L8/001/003), will result in a total of 1206 paths
Before improvement 13.72 seconds
After improvement 2.13 seconds (roughly 6.5x improvement)

The 30 glob paths:

"s3a://landsat-pds/L8/001/003/LC80010032014144LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014176LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014192LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014208LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014224LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014240LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032014272LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015115LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015131LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015147LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015163LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015179LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015195LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015211LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015227LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015243LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015259LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032015275LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016118LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016134LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016150LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016166LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016182LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016198LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016214LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016230LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016246LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016262LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032016278LGN00/*",
"s3a://landsat-pds/L8/001/003/LC80010032017120LGN00/*"


object TestPaths {
val hadoopConf = new Configuration()
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed here, but it is worth knowing that there is a package scoped static method Filesystem.addFileSystemForTesting to actually let you add a specific instance to the file system cache. I use sometimes as it lets me register mob file systems for existing schemes (s3, etc).

val hadoopConf = new Configuration()
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)

val path1: Path = new Path("mockFs:///somepath1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safest to actually have an authority here, e.g mockFs://mock/somepath1

FileSystem uses the scheme and authority to cache things, and some of the FS operations work on the path part of the URI, ignoring the authority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
val qualifiedPaths = pathStrings
.map{pathString =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: .map { pathString =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new AnalysisException(s"Path does not exist: ${globPath.head}")
// Split the paths into glob and non glob paths, because we don't need to do an existence check
// for globbed paths.
val globPaths = qualifiedPaths
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can get both at once with val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

assert(
resultPaths.toSet == Set(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: use === in asserts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val nonGlobPaths = qualifiedPaths
.filter(path => !SparkHadoopUtil.get.isGlobPath(path))

val globbedPaths = globPaths.par.flatMap { globPath =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globPaths.par.flatMap -> ThreadUtils.parmap(globPaths, "globPath", 8)?

test("parmap should be interruptible") {
val t = new Thread() {
setDaemon(true)
override def run() {
try {
// "par" is uninterruptible. The following will keep running even if the thread is
// interrupted. We should prefer to use "ThreadUtils.parmap".
//
// (1 to 10).par.flatMap { i =>
// Thread.sleep(100000)
// 1 to i
// }
//
ThreadUtils.parmap(1 to 10, "test", 2) { i =>
Thread.sleep(100000)
1 to i
}.flatten
} catch {
case _: InterruptedException => // excepted
}
}
}
t.start()
eventually(timeout(10.seconds)) {
assert(t.isAlive)
}
t.interrupt()
eventually(timeout(10.seconds)) {
assert(!t.isAlive)
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, we want to try to use ThreadUtils to avoid reusing the global thread pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum Any thoughts on the number of threads? #25899 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 looks fine to me.

@SparkQA
Copy link

SparkQA commented Sep 23, 2019

Test build #111213 has finished for PR 25899 at commit 0cd31ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun dongjoon-hyun changed the title SPARK-29089 DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading large amount of S3 files [SPARK-29089][SQL] DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading large amount of S3 files Sep 23, 2019
@dongjoon-hyun
Copy link
Member

@cozos . Thank you for making a PR. In Apache Spark community, we use [SPARK-29089][SQL] prefix in the PR title. And, could you update your PR title to describe this PR's approach instead of the problem? It's because we have several ways for the same problem.

@cozos cozos changed the title [SPARK-29089][SQL] DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading large amount of S3 files [SPARK-29089][SQL] Parallelize blocking FileSystem calls in DataSource#checkAndGlobPathIfNecessary Sep 24, 2019
@cozos
Copy link
Contributor Author

cozos commented Sep 24, 2019

@dongjoon-hyun Ok done, how does that sound? Should I update the JIRA aswell?

@steveloughran
Copy link
Contributor

Nice experiment!

I guess in-EC2, you're limited by the number of course but also latency is nice and low. Remotely, latency is worse so if there is anything we can do in parallel threads -there are some tangible benefits.

in both local and remote S3 interaction rename() is faked with a COPY, which is 6-10MB/s; that can be done via the thread pool too if you can configure the AWS SDK to split up a large copy into parallel parts. That shares the same pools, so its useful to have some capacity there on any process renaming things.

@cozos
Copy link
Contributor Author

cozos commented Nov 6, 2019

@steveloughran How should we proceed? Does 40 threads sound OK? What else does the PR need?

@steveloughran
Copy link
Contributor

I'd say 40 sounds good; people can tune it

globResult
}.flatten
} catch {
case e: SparkException => throw e.getCause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever the case that the cause is null?

Copy link
Contributor Author

@cozos cozos Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException comes from ThreadUtils#parmap and ThreadUtils#awaitResult

Which always seems to wrap another exception i.e. never null

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this silently drops the caller stack trace, I opened SPARK-47833(#46028) to fix it.

@steveloughran
Copy link
Contributor

This code LGTM: skips needless probes on the globbed paths; parallel checks on the others.

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113603 has finished for PR 25899 at commit dc48da8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cozos
Copy link
Contributor Author

cozos commented Nov 19, 2019

Any next steps for me? Or just need 👀 from commiters?

@steveloughran
Copy link
Contributor

Quick follow up to the "how many connections" discussion.

It turns out that a sufficiently large Number of S3 DNS lookups can trigger 503 throttling on DNS and there isn't anything in the AWS library to react to this. The bigger at the connection pool, the more connections you are likely to see on worker start-up, probably workers * S3A URLS * "fs.s3a.connection.maximum" . Don't go overboard. And if you do see the problem, file a HADOOP with stack traces, configs and anything else which could help implementing resilience here.

that doesn't make any difference to this PR, just something to know

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fairly reasonable to me, modulo a few comments. Normally I don't like the extra complexity but this seems like a common bottleneck.

}

allGlobPath
allPaths.seq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you call .toSeq? I'm working on Scala 2.13 support and I think this is deprecated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val hadoopConf = new Configuration()
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)

val path1: Path = new Path("mockFs://mockFs/somepath1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: no need for ": Path" everywhere here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Test build #115136 has finished for PR 25899 at commit a3cdc59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Test build #115166 has finished for PR 25899 at commit 105235c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good to me. I don't see a behavior change; it just adds parallelism. While that has implications of its own, I think it'll be a win for a real pain point.

@cozos
Copy link
Contributor Author

cozos commented Feb 13, 2020

Hi all,

How can I get this PR accepted? Anything I can do to help with the process?

@srowen
Copy link
Member

srowen commented Feb 13, 2020

@steveloughran @wangyum you OK with it too? It seems OK for 3.1. Let me retrigger tests to see if there is still a failure though.

@srowen
Copy link
Member

srowen commented Feb 13, 2020

Jenkins retest this please.

@steveloughran
Copy link
Contributor

LGTM,

There's one thing to be aware of, which is that Path.get(conf) maps to FileSystem.get(path.getURI(), conf), which looks in a shared cache for existing instances (good) then instantiates one if there isn't one for that User + URI. And if that FS instance takes a while to start up (i.e. FileSystem.initialize()) is slow, then multiple threads will all end up trying to create instances, then discard all but one afterwards.

Hence: apache/hadoop#1838 ; removing some network IO in S3AFileSystem.initialize() by giving you the option of not bothering to look to see if the bucket exists.

Does that mean there's anything wrong with this PR? No, only that performance is best if the relevant FS instances have already been preloaded into the FS cache. And those people implementing filesystem connectors should do a better job at low-latency instantiation, even if it means async network startup threads and moving the blocking to the first FS API call instead.

@SparkQA
Copy link

SparkQA commented Feb 13, 2020

Test build #118372 has finished for PR 25899 at commit 105235c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Feb 17, 2020

Merged to master

@srowen srowen closed this in 25e9156 Feb 17, 2020
@cozos
Copy link
Contributor Author

cozos commented Feb 18, 2020

Thank you everybody!

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…e#checkAndGlobPathIfNecessary

### What changes were proposed in this pull request?
See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

> - have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
> - add parallel execution to the glob and existence checks

### Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

I added a test suite `DataSourceSuite` - open to suggestions for better naming.

See [here](apache#25899 (comment)) and [here](apache#25899 (comment)) for some measurements

Closes apache#25899 from cozos/master.

Lead-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
holdenk pushed a commit to holdenk/spark that referenced this pull request Jun 25, 2020
…e#checkAndGlobPathIfNecessary

### What changes were proposed in this pull request?
See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

> - have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
> - add parallel execution to the glob and existence checks

### Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

I added a test suite `DataSourceSuite` - open to suggestions for better naming.

See [here](apache#25899 (comment)) and [here](apache#25899 (comment)) for some measurements

Closes apache#25899 from cozos/master.

Lead-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…e#checkAndGlobPathIfNecessary

### What changes were proposed in this pull request?
See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

> - have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
> - add parallel execution to the glob and existence checks

### Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

I added a test suite `DataSourceSuite` - open to suggestions for better naming.

See [here](apache#25899 (comment)) and [here](apache#25899 (comment)) for some measurements

Closes apache#25899 from cozos/master.

Lead-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Co-authored-by: Arwin Tio <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants