Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented May 22, 2019

What changes were proposed in this pull request?

This pr refers to ParquetFileFormat.readParquetFootersInParallel to parallel resolve leaf statuses in InMemoryFileIndex.

How was this patch tested?

manual tests. Change InMemoryFileIndex.listLeafFiles to:

   private def listLeafFiles(
      path: Path,
      hadoopConf: Configuration,
      filter: PathFilter,
      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
    logTrace(s"Listing $path")
    val fs = path.getFileSystem(hadoopConf)

    // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
    // Note that statuses only include FileStatus for the files and dirs directly under path,
    // and does not include anything else recursively.
    val statuses = try fs.listStatus(path) catch {
      case _: FileNotFoundException =>
        logWarning(s"The directory $path was not found. Was it deleted very recently?")
        Array.empty[FileStatus]
    }

    val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))

    val allLeafStatuses = {
      val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
      val nestedFiles: Seq[FileStatus] = sessionOpt match {
        case Some(session) =>
          bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
        case _ =>
          dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
      }
      val allFiles = topLevelFiles ++ nestedFiles
      if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
    }

    val missingFiles = mutable.ArrayBuffer.empty[String]
    val filteredLeafStatuses = allLeafStatuses.filterNot(
      status => shouldFilterOut(status.getPath.getName))

    def resolveLeafStatuses(fileStatus: FileStatus): Option[LocatedFileStatus] = fileStatus match {
      case f: LocatedFileStatus =>
        Some(f)

      // NOTE:
      //
      // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
      //   operations, calling `getFileBlockLocations` does no harm here since these file system
      //   implementations don't actually issue RPC for this method.
      //
      // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
      //   be a big deal since we always use to `bulkListLeafFiles` when the number of
      //   paths exceeds threshold.
      case f =>
        // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
        // which is very slow on some file system (RawLocalFileSystem, which is launch a
        // subprocess and parse the stdout).
        try {
          val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
            // Store BlockLocation objects to consume less memory
            if (loc.getClass == classOf[BlockLocation]) {
              loc
            } else {
              new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
            }
          }
          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
          if (f.isSymlink) {
            lfs.setSymlink(f.getSymlink)
          }
          Some(lfs)
        } catch {
          case _: FileNotFoundException =>
            missingFiles += f.getPath.toString
            None
        }
    }

    val start1 = System.currentTimeMillis()
    val parResolvedLeafStatuses = ThreadUtils.parmap(
      filteredLeafStatuses.toSeq, "resolveLeafStatuses", 8)(resolveLeafStatuses).flatten

    val start2 = System.currentTimeMillis()
    val defResolvedLeafStatuses = filteredLeafStatuses.flatMap(resolveLeafStatuses)

    val end = System.currentTimeMillis()
    logWarning(s"Elements: ${parResolvedLeafStatuses.size}, " +
      s"parallel time token: ${start2 - start1}, default time token: ${end - start2}.")

    if (missingFiles.nonEmpty) {
      logWarning(
        s"the following files were missing during file scan:\n  ${missingFiles.mkString("\n  ")}")
    }

    parResolvedLeafStatuses
  }

The first time:

19/05/22 08:09:55 WARN InMemoryFileIndex: Elements: 10, parallel time token: 66, default time token: 123.
19/05/22 08:10:08 WARN InMemoryFileIndex: Elements: 20, parallel time token: 27, default time token: 202.
19/05/22 08:10:10 WARN InMemoryFileIndex: Elements: 50, parallel time token: 40, default time token: 371.
19/05/22 08:10:13 WARN InMemoryFileIndex: Elements: 100, parallel time token: 83, default time token: 664.
19/05/22 08:10:18 WARN InMemoryFileIndex: Elements: 200, parallel time token: 162, default time token: 1174.
19/05/22 08:10:34 WARN InMemoryFileIndex: Elements: 500, parallel time token: 3797, default time token: 7533.
19/05/22 08:10:43 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 88, default time token: 357.
19/05/22 08:11:24 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 3755, default time token: 14304.
19/05/22 08:13:03 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 47553, default time token: 30628.
19/05/22 08:20:31 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 20475, default time token: 23823.
19/05/22 08:22:01 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 1561, default time token: 9339.
19/05/22 08:23:02 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2829, default time token: 20497.
19/05/22 08:24:44 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 5941, default time token: 51388.
19/05/22 08:33:55 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 51993, default time token: 406825.

The second time:

19/05/22 08:47:17 WARN InMemoryFileIndex: Elements: 10, parallel time token: 34, default time token: 10.
19/05/22 08:49:59 WARN InMemoryFileIndex: Elements: 20, parallel time token: 8, default time token: 22.
19/05/22 08:50:03 WARN InMemoryFileIndex: Elements: 50, parallel time token: 14, default time token: 28.
19/05/22 08:50:23 WARN InMemoryFileIndex: Elements: 100, parallel time token: 20, default time token: 73.
19/05/22 08:50:32 WARN InMemoryFileIndex: Elements: 200, parallel time token: 43, default time token: 103.
19/05/22 08:50:47 WARN InMemoryFileIndex: Elements: 500, parallel time token: 166, default time token: 271.
19/05/22 08:52:08 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 113, default time token: 385.
19/05/22 08:53:44 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 257, default time token: 971.
19/05/22 08:55:01 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 753, default time token: 2197.
19/05/22 08:57:04 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 1116, default time token: 4307.
19/05/22 09:00:39 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 3160, default time token: 8983.
19/05/22 09:14:24 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 77537, default time token: 397752.
19/05/22 09:19:05 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 10355, default time token: 91161.
19/05/22 09:30:05 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71769, default time token: 375620.

The third time:

19/05/22 09:13:21 WARN InMemoryFileIndex: Elements: 10, parallel time token: 62, default time token: 83.
19/05/22 09:13:35 WARN InMemoryFileIndex: Elements: 20, parallel time token: 53, default time token: 299.
19/05/22 09:13:39 WARN InMemoryFileIndex: Elements: 50, parallel time token: 71, default time token: 347.
19/05/22 09:14:31 WARN InMemoryFileIndex: Elements: 100, parallel time token: 27, default time token: 2766.
19/05/22 09:14:41 WARN InMemoryFileIndex: Elements: 200, parallel time token: 61, default time token: 235.
19/05/22 09:15:22 WARN InMemoryFileIndex: Elements: 500, parallel time token: 65, default time token: 239.
19/05/22 09:17:20 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 882, default time token: 428.
19/05/22 09:18:19 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 919, default time token: 6229.
19/05/22 09:20:11 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 557, default time token: 9557.
19/05/22 09:22:27 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 838, default time token: 10047.
19/05/22 09:24:49 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 2294, default time token: 9914.
19/05/22 09:28:18 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2955, default time token: 22586.
19/05/22 09:30:30 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 7447, default time token: 50589.
19/05/22 09:41:03 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71536, default time token: 406319.

@joshrosen-stripe
Copy link
Contributor

FYI, there's a related optimization proposed by @rrusso2007 at #24672. Over on that PR, there's some discussion of a tricky edge-case related to error handling.

@rrusso2007
Copy link
Contributor

In my pull request #24672 I am switching to using a method that returns LocatedFileStatus and these lookups are unnecessary in that case. For HDFS then specifically this won't be necessary to do the optimization in this pull. Maybe if we do want to do an optimization like this pull for other file systems we should filter out the LocatedFileStatus first instead of detecting them in parallel. If we do that and they are all located already then there's no need to make a parallel collection to resolve them.

On another note, when doing the parallel resolution of multiple paths, the existing system uses a spark job to do parallelization as opposed to a parallel collection. There might be risk of launching many parallel threads in the driver like this unexpectedly as opposed to offloading this to the executors which have allocated cores.

@SparkQA
Copy link

SparkQA commented May 22, 2019

Test build #105699 has finished for PR 24679 at commit 6c9b63b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum closed this May 23, 2019
@wangyum wangyum deleted the SPARK-27807 branch May 23, 2019 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants