From 68155cc5d0a2c861fdac501e0375c2d339752ea5 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Sun, 11 Jun 2017 13:37:29 +0200 Subject: [PATCH 1/5] SPARK-21056: use at most one spark job to list files in InMemoryFileIndex --- .../datasources/InMemoryFileIndex.scala | 124 +++++++++++------- .../datasources/FileIndexSuite.scala | 70 +++++----- 2 files changed, 115 insertions(+), 79 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 91e31650617ec..822ac9ca95d48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -168,9 +168,7 @@ object InMemoryFileIndex extends Logging { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) - } + return listLeafFiles(paths, hadoopConf, filter, Some(sparkSession)) } logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") @@ -190,9 +188,8 @@ object InMemoryFileIndex extends Logging { .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) - }.iterator + val paths = pathStrings.map(new Path(_)).toSeq + listLeafFiles(paths, hadoopConf, filter, None).iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => // Turn FileStatus into SerializableFileStatus so we can send it back to the driver @@ -248,60 +245,93 @@ object InMemoryFileIndex extends Logging { * @return all children of path that match the specified filter. */ private def listLeafFiles( - path: Path, + paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession]): Seq[FileStatus] = { - logTrace(s"Listing $path") - val fs = path.getFileSystem(hadoopConf) + sessionOpt: Option[SparkSession]): Seq[(Path, Seq[FileStatus])] = { + logTrace(s"Listing ${paths.mkString(", ")}") // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] + val statuses = paths.flatMap { path => + try { + val fs = path.getFileSystem(hadoopConf) + Some(path -> fs.listStatus(path)) + } catch { + case _: FileNotFoundException => + logWarning(s"The directory $paths was not found. Was it deleted very recently?") + None + } } - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + val filteredStatuses = statuses.flatMap { case (path, fStatuses) => + val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + if (filtered.isEmpty) { + None + } else { + Some(path -> filtered) + } + } val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) - case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) => + fStatuses.map {f => path -> f } + }.partition { case (_, fStatus) => fStatus.isDirectory } + val pathsToList = dirs.map { case (_, fStatus) => fStatus.getPath } + val nestedFiles = if (pathsToList.nonEmpty) { + sessionOpt match { + case Some(session) => + bulkListLeafFiles(pathsToList, hadoopConf, filter, session) + case _ => + listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt) + } + } else Seq.empty[(Path, Seq[FileStatus])] + val allFiles = topLevelFiles + .map { case (path, fStatus) => path -> Seq(fStatus) } ++ nestedFiles + allFiles.flatMap { case (path, fStatuses) => + val accepted = if (filter != null) { + fStatuses.filter(f => filter.accept(f.getPath)) + } else { + fStatuses + }.filterNot(status => shouldFilterOut(status.getPath.getName)) + if (accepted.nonEmpty) { + Some(path -> accepted) + } else { + None + } } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs + allLeafStatuses.map { case (path, fStatuses) => + val fs = path.getFileSystem(hadoopConf) + path -> fStatuses.map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus( + f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations + ) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b4616826e40b3..e112928fb9daa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -86,53 +86,59 @@ class FileIndexSuite extends SharedSQLContext { } test("PartitioningAwareFileIndex listing parallelized with many top level dirs") { - for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { - withTempDir { dir => - val topLevelDirs = (1 to scale).map { i => - val tmp = new File(dir, s"foo=$i.txt") - tmp.mkdir() - new Path(tmp.getCanonicalPath) + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32") { + for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { + withTempDir { dir => + val topLevelDirs = (1 to scale).map { i => + val tmp = new File(dir, s"foo=$i.txt") + tmp.mkdir() + new Path(tmp.getCanonicalPath) + } + HiveCatalogMetrics.reset() + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } - HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } } } test("PartitioningAwareFileIndex listing parallelized with large child dirs") { - for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { - withTempDir { dir => - for (i <- 1 to scale) { - new File(dir, s"foo=$i.txt").mkdir() + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32") { + for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) { + withTempDir { dir => + for (i <- 1 to scale) { + new File(dir, s"foo=$i.txt").mkdir() + } + HiveCatalogMetrics.reset() + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } - HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } } } test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") { - for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) { - withTempDir { dir => - for (i <- 1 to 2) { - val subdirA = new File(dir, s"a=$i") - subdirA.mkdir() - for (j <- 1 to 2) { - val subdirB = new File(subdirA, s"b=$j") - subdirB.mkdir() - for (k <- 1 to scale) { - new File(subdirB, s"foo=$k.txt").mkdir() + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32") { + for ((scale, expectedNumPar) <- Seq((2, 0), (50, 1))) { + withTempDir { dir => + for (i <- 1 to 2) { + val subdirA = new File(dir, s"a=$i") + subdirA.mkdir() + for (j <- 1 to 2) { + val subdirB = new File(subdirA, s"b=$j") + subdirB.mkdir() + for (k <- 1 to scale) { + new File(subdirB, s"foo=$k.txt").mkdir() + } } } + HiveCatalogMetrics.reset() + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } - HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } } } From 2150f1c137c5302abc7b66a0983e312e7d2b0ae0 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Mon, 12 Jun 2017 08:15:22 +0200 Subject: [PATCH 2/5] SPARK-21056: Use the same instance of FileSystem --- .../sql/execution/datasources/InMemoryFileIndex.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 822ac9ca95d48..955d78f400198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -250,14 +250,14 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession]): Seq[(Path, Seq[FileStatus])] = { logTrace(s"Listing ${paths.mkString(", ")}") + val fs = paths.headOption.map(_.getFileSystem(hadoopConf)) // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = paths.flatMap { path => try { - val fs = path.getFileSystem(hadoopConf) - Some(path -> fs.listStatus(path)) + Some(path -> fs.get.listStatus(path)) } catch { case _: FileNotFoundException => logWarning(s"The directory $paths was not found. Was it deleted very recently?") @@ -304,7 +304,6 @@ object InMemoryFileIndex extends Logging { } allLeafStatuses.map { case (path, fStatuses) => - val fs = path.getFileSystem(hadoopConf) path -> fStatuses.map { case f: LocatedFileStatus => f @@ -322,7 +321,7 @@ object InMemoryFileIndex extends Logging { // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val locations = fs.get.getFileBlockLocations(f, 0, f.getLen) val lfs = new LocatedFileStatus( f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 0, null, null, null, null, f.getPath, locations From 579c5671ba78720325ebf60bc097e4ae7cb7e62f Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Sun, 18 Jun 2017 13:53:45 +0200 Subject: [PATCH 3/5] SPARK-21056: Reorg code --- .../datasources/InMemoryFileIndex.scala | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 955d78f400198..fe9a87f4f6a28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -255,9 +255,15 @@ object InMemoryFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = paths.flatMap { path => + val filteredStatuses = paths.flatMap { path => try { - Some(path -> fs.get.listStatus(path)) + val fStatuses = fs.get.listStatus(path) + val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + if (filtered.nonEmpty) { + Some(path -> filtered) + } else { + None + } } catch { case _: FileNotFoundException => logWarning(s"The directory $paths was not found. Was it deleted very recently?") @@ -265,15 +271,6 @@ object InMemoryFileIndex extends Logging { } } - val filteredStatuses = statuses.flatMap { case (path, fStatuses) => - val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - if (filtered.isEmpty) { - None - } else { - Some(path -> filtered) - } - } - val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) => fStatuses.map {f => path -> f } @@ -287,20 +284,21 @@ object InMemoryFileIndex extends Logging { listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt) } } else Seq.empty[(Path, Seq[FileStatus])] - val allFiles = topLevelFiles - .map { case (path, fStatus) => path -> Seq(fStatus) } ++ nestedFiles - allFiles.flatMap { case (path, fStatuses) => - val accepted = if (filter != null) { - fStatuses.filter(f => filter.accept(f.getPath)) - } else { - fStatuses - }.filterNot(status => shouldFilterOut(status.getPath.getName)) - if (accepted.nonEmpty) { - Some(path -> accepted) - } else { - None - } - } + val allFiles = topLevelFiles.groupBy { case (path, _) => path } + .flatMap { case (path, pAndStatuses) => + val fStatuses = pAndStatuses.map { case (_, f) => f } + val accepted = if (filter != null) { + fStatuses.filter(f => filter.accept(f.getPath)) + } else { + fStatuses + } + if (accepted.nonEmpty) { + Some(path -> accepted) + } else { + None + } + }.toSeq + nestedFiles ++ allFiles } allLeafStatuses.map { case (path, fStatuses) => From cb21cf7533f0088caa334af545d4dfe764076c01 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Sun, 18 Jun 2017 14:03:52 +0200 Subject: [PATCH 4/5] SPARK-21056: Bail out early if paths is empty --- .../datasources/InMemoryFileIndex.scala | 150 +++++++++--------- 1 file changed, 77 insertions(+), 73 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index fe9a87f4f6a28..c5696ecda2b60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -250,84 +250,88 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession]): Seq[(Path, Seq[FileStatus])] = { logTrace(s"Listing ${paths.mkString(", ")}") - val fs = paths.headOption.map(_.getFileSystem(hadoopConf)) - - // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val filteredStatuses = paths.flatMap { path => - try { - val fStatuses = fs.get.listStatus(path) - val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - if (filtered.nonEmpty) { - Some(path -> filtered) - } else { - None - } - } catch { - case _: FileNotFoundException => - logWarning(s"The directory $paths was not found. Was it deleted very recently?") - None - } - } - - val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) => - fStatuses.map {f => path -> f } - }.partition { case (_, fStatus) => fStatus.isDirectory } - val pathsToList = dirs.map { case (_, fStatus) => fStatus.getPath } - val nestedFiles = if (pathsToList.nonEmpty) { - sessionOpt match { - case Some(session) => - bulkListLeafFiles(pathsToList, hadoopConf, filter, session) - case _ => - listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt) - } - } else Seq.empty[(Path, Seq[FileStatus])] - val allFiles = topLevelFiles.groupBy { case (path, _) => path } - .flatMap { case (path, pAndStatuses) => - val fStatuses = pAndStatuses.map { case (_, f) => f } - val accepted = if (filter != null) { - fStatuses.filter(f => filter.accept(f.getPath)) - } else { - fStatuses - } - if (accepted.nonEmpty) { - Some(path -> accepted) + if (paths.isEmpty) { + Nil + } else { + val fs = paths.head.getFileSystem(hadoopConf) + + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val filteredStatuses = paths.flatMap { path => + try { + val fStatuses = fs.listStatus(path) + val filtered = fStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + if (filtered.nonEmpty) { + Some(path -> filtered) } else { None } - }.toSeq - nestedFiles ++ allFiles - } + } catch { + case _: FileNotFoundException => + logWarning(s"The directory $paths was not found. Was it deleted very recently?") + None + } + } - allLeafStatuses.map { case (path, fStatuses) => - path -> fStatuses.map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.get.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus( - f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations - ) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) + val allLeafStatuses = { + val (dirs, topLevelFiles) = filteredStatuses.flatMap { case (path, fStatuses) => + fStatuses.map { f => path -> f } + }.partition { case (_, fStatus) => fStatus.isDirectory } + val pathsToList = dirs.map { case (_, fStatus) => fStatus.getPath } + val nestedFiles = if (pathsToList.nonEmpty) { + sessionOpt match { + case Some(session) => + bulkListLeafFiles(pathsToList, hadoopConf, filter, session) + case _ => + listLeafFiles(pathsToList, hadoopConf, filter, sessionOpt) } - lfs + } else Seq.empty[(Path, Seq[FileStatus])] + val allFiles = topLevelFiles.groupBy { case (path, _) => path } + .flatMap { case (path, pAndStatuses) => + val fStatuses = pAndStatuses.map { case (_, f) => f } + val accepted = if (filter != null) { + fStatuses.filter(f => filter.accept(f.getPath)) + } else { + fStatuses + } + if (accepted.nonEmpty) { + Some(path -> accepted) + } else { + None + } + }.toSeq + nestedFiles ++ allFiles + } + + allLeafStatuses.map { case (path, fStatuses) => + path -> fStatuses.map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus( + f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations + ) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } } } } From c4fba40c183762b8fcacfce7fc8177a8de1f9563 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Sun, 18 Jun 2017 16:04:17 +0200 Subject: [PATCH 5/5] SPARK-21056: Fix serialization issue in bulkListLeafFiles --- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index c5696ecda2b60..030f99d84456c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -188,7 +188,7 @@ object InMemoryFileIndex extends Logging { .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value - val paths = pathStrings.map(new Path(_)).toSeq + val paths = pathStrings.map(new Path(_)).toIndexedSeq listLeafFiles(paths, hadoopConf, filter, None).iterator }.map { case (path, statuses) => val serializableStatuses = statuses.map { status =>