From 0082b7633e8f84fe5cafa0362cd45cce4cfee459 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 16:04:30 +0800 Subject: [PATCH 01/13] [SPAKR-18726][SQL]resolveRelation for FileFormate DataSource don't need to listFiles twice --- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 +++--- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d510581f90e6..e2434aaa7dcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -86,7 +86,7 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) - + private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when @@ -122,7 +122,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None) + new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning @@ -384,7 +384,7 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) + new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) } HadoopFsRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 7531f0ae02e7..ee4d0863d977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -66,8 +66,8 @@ class InMemoryFileIndex( } override def refresh(): Unit = { - refresh0() fileStatusCache.invalidateAll() + refresh0() } private def refresh0(): Unit = { From 6b5454ad0104459565febb520fa22ef30bdb8368 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 16:39:45 +0800 Subject: [PATCH 02/13] add test case --- .../sql/hive/PartitionedTablePerfStatsSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index b792a168a4f9..50506197b313 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -411,4 +411,15 @@ class PartitionedTablePerfStatsSuite } } } + + test("resolveRelation for a FileFormat DataSource without userSchema scan filesystem only once") { + withTempDir { dir => + import spark.implicits._ + Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath) + HiveCatalogMetrics.reset() + spark.read.parquet(dir.getAbsolutePath) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1) + } + } } From f1da0a4cf457f4efb6128beca3c08ccf95ef37a0 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 07:59:34 +0800 Subject: [PATCH 03/13] fix a style --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e2434aaa7dcf..882a0bd4c943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -384,7 +384,8 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) + new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), + fileStatusCache) } HadoopFsRelation( From a8c1deab0fc8e59863bf4a3d3b551f77fbebbc6d Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 09:50:30 +0800 Subject: [PATCH 04/13] fix test failed --- .../apache/spark/sql/sources/ResolvedDataSourceSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 9b5e364e512a..b5ba066c1386 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -21,11 +21,12 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.test.SharedSQLContext -class ResolvedDataSourceSuite extends SparkFunSuite { +class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext { private def getProvidingClass(name: String): Class[_] = DataSource( - sparkSession = null, + sparkSession = spark, className = name, options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass From 60fa03757d223f833e2fa161326a48a9015d4c6c Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 12:49:08 +0800 Subject: [PATCH 05/13] add a lazy --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a58684e2a147..6dcf85417229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -86,7 +86,7 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) - private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + private lazy val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when From 9a73947efea334ba0cfc5b5508003807a93ff806 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 14:49:44 +0800 Subject: [PATCH 06/13] fix code style --- .../apache/spark/sql/execution/datasources/DataSource.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6dcf85417229..1cb0ba515084 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -364,7 +364,11 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), + new InMemoryFileIndex( + sparkSession, + globbedPaths, + options, + Some(partitionSchema), fileStatusCache) } From c39eb26da38f9d92e3871814be446c8d911be890 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:03:18 +0800 Subject: [PATCH 07/13] make filestatuscache local var --- .../sql/execution/datasources/DataSource.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 1cb0ba515084..eedc505e8748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -86,7 +86,6 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) - private lazy val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when @@ -109,7 +108,9 @@ case class DataSource( * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ - private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { + private def getOrInferFileFormatSchema( + format: FileFormat, + fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below @@ -354,7 +355,8 @@ case class DataSource( globPath }.toArray - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -365,11 +367,7 @@ case class DataSource( catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { new InMemoryFileIndex( - sparkSession, - globbedPaths, - options, - Some(partitionSchema), - fileStatusCache) + sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) } HadoopFsRelation( From f3332cb870ae2be9383969de07a07c8761230e8b Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:04:55 +0800 Subject: [PATCH 08/13] modify a test case --- .../apache/spark/sql/sources/ResolvedDataSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index b5ba066c1386..1f08f0e5f017 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.test.SharedSQLContext -class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext { +class ResolvedDataSourceSuite extends SparkFunSuite { private def getProvidingClass(name: String): Class[_] = DataSource( - sparkSession = spark, + sparkSession = null, className = name, options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass From 9cadd4168041fd859cc1e4b8396e5ed514129bff Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:05:24 +0800 Subject: [PATCH 09/13] modify a test case --- .../org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 1f08f0e5f017..9b5e364e512a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -21,7 +21,6 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.test.SharedSQLContext class ResolvedDataSourceSuite extends SparkFunSuite { private def getProvidingClass(name: String): Class[_] = From 28c8158a7c9d7acdbf2a07ef66ace46c1215979f Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:06:40 +0800 Subject: [PATCH 10/13] modify a test case --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index eedc505e8748..e5edd5f5a8fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -86,6 +86,7 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) + /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when From 92618b3ad67c899e681a9923ad9abc5a7f2c7897 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:07:10 +0800 Subject: [PATCH 11/13] remove an empty line --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e5edd5f5a8fd..ddc6dfa207af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -86,7 +86,7 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) - + /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when From f6ec4fe020c153ca0e84afd3cbb5def22d9b6651 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 08:33:31 +0800 Subject: [PATCH 12/13] add param comment --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ddc6dfa207af..0a2c9e334a0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -106,6 +106,7 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource + * @param fileStatusCache fileStatusCache for InMemoryFileIndex * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ From 3e495a73fb01fe118c93422e87d1be1f66e78678 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 10:02:30 +0800 Subject: [PATCH 13/13] fix a comment --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0a2c9e334a0e..4947dfda6fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -106,7 +106,7 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileStatusCache fileStatusCache for InMemoryFileIndex + * @param fileStatusCache the shared cache for file statuses to speed up listing * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */