From ebada5837d0c0a3970f809988b7a3f71b68e70ab Mon Sep 17 00:00:00 2001 From: Ian Li Date: Thu, 19 Apr 2018 13:11:58 -0700 Subject: [PATCH 1/2] SPY-1737: avoid excessive/redundant calls to DistributedFileSystem.isDirectory --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 19 ++++++++++++------- .../sql/hive/HiveMetastoreCatalogSuite.scala | 8 -------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fa65f88992f84..d9112b6d4de61 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -227,23 +227,28 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(output = newOutput) } + /** + * Selecting directories at driver side by interacting with hdfs won't scale. + * TODO: migrate to parquet partitioning + */ private[hive] def selectParquetLocationDirectories( tableName: String, locationOpt: Option[Path]): Seq[Path] = { + val start = System.currentTimeMillis val hadoopConf = sparkSession.sparkContext.hadoopConfiguration val paths: Option[Seq[Path]] = for { selector <- sparkSession.sharedState.externalCatalog.findHadoopFileSelector location <- locationOpt fs = location.getFileSystem(hadoopConf) - selectedPaths <- selector.selectFiles(tableName, fs, location) - selectedDir = for { - selectedPath <- selectedPaths - if selectedPath - .getFileSystem(hadoopConf) - .isDirectory(selectedPath) - } yield selectedPath + // Csd's HadoopFileSelector should guarantee to return directories only, + selectedDir <- selector.selectFiles(tableName, fs, location) if selectedDir.nonEmpty } yield selectedDir + logInfo( + s"process duration of HiveMetastoreCatalog.selectParquetLocationDirectories(" + + s"$tableName, $locationOpt): ${System.currentTimeMillis - start}, selected directories: " + + s"${paths.map(_.size).getOrElse(0)}") + paths.getOrElse(Seq(locationOpt.orNull)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index f6c41b6ee0fbd..4e19d79b738d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -223,14 +223,6 @@ class ParquetLocationSelectionSuite extends QueryTest with SQLTestUtils with Tes hmc.selectParquetLocationDirectories("sometable", Option(new Path("somewhere"))) } - // ensure file existence for somewhere/sometable - somewhereSometable.delete() - somewhereSometable.createNewFile() - // somewhere/sometable is a file => will not be selected - assertResult(Seq(new Path("somewhere"))) { - hmc.selectParquetLocationDirectories("otherplace", Option(new Path("somewhere"))) - } - // no location specified, none selected assertResult(Seq(null)) { hmc.selectParquetLocationDirectories("sometable", Option(null)) From ce3a5178d9d86af659b4211d2fa7fb9413a03a2b Mon Sep 17 00:00:00 2001 From: Ian Li Date: Thu, 19 Apr 2018 15:58:53 -0700 Subject: [PATCH 2/2] debug level logging --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d9112b6d4de61..e47f64a74bd1a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -244,7 +244,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log selectedDir <- selector.selectFiles(tableName, fs, location) if selectedDir.nonEmpty } yield selectedDir - logInfo( + logDebug( s"process duration of HiveMetastoreCatalog.selectParquetLocationDirectories(" + s"$tableName, $locationOpt): ${System.currentTimeMillis - start}, selected directories: " + s"${paths.map(_.size).getOrElse(0)}")