From 51b385bef56c81b40b0631350640e6bd4579030f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 9 Apr 2018 02:19:48 +0800 Subject: [PATCH 01/13] improve PartitioningAwareFileIndex --- .../datasources/CatalogFileIndex.scala | 2 +- .../execution/datasources/DataSource.scala | 81 ++++++++----------- .../datasources/InMemoryFileIndex.scala | 8 +- .../PartitioningAwareFileIndex.scala | 57 ++++++++----- .../streaming/MetadataLogFileIndex.scala | 10 +-- .../datasources/FileSourceStrategySuite.scala | 2 +- 6 files changed, 81 insertions(+), 79 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 4046396d0e614..a66a07673e25f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -85,7 +85,7 @@ class CatalogFileIndex( sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) } else { new InMemoryFileIndex( - sparkSession, rootPaths, table.storage.properties, partitionSchema = None) + sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b84ea769808f9..46aae80430aea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -103,24 +103,6 @@ case class DataSource( bucket.sortColumnNames, "in the sort definition", equality) } - /** - * In the read path, only managed tables by Hive provide the partition columns properly when - * initializing this class. All other file based data sources will try to infer the partitioning, - * and then cast the inferred types to user specified dataTypes if the partition columns exist - * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or - * inconsistent data types as reported in SPARK-21463. - * @param fileIndex A FileIndex that will perform partition inference - * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` - */ - private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = { - val resolved = fileIndex.partitionSchema.map { partitionField => - // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred - userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( - partitionField) - } - StructType(resolved) - } - /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when @@ -140,31 +122,22 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileStatusCache the shared cache for file statuses to speed up listing + * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ private def getOrInferFileFormatSchema( format: FileFormat, - fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = { + fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below - lazy val tempFileIndex = { - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.toSeq.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) - } + lazy val tempFileIndex = fileIndex.getOrElse( + createInMemoryFileIndex(withFileStatusCache = false)) val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex) + tempFileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning @@ -225,6 +198,25 @@ case class DataSource( (dataSchema, partitionSchema) } + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex( + withFileStatusCache: Boolean, + checkEmptyGlobPath: Boolean = false, + checkFilesExist: Boolean = false): InMemoryFileIndex = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val globbedPaths = allPaths.flatMap( + DataSource.checkAndGlobPathIfNecessary( + hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray + val fileStatusCache = if (withFileStatusCache) { + FileStatusCache.getOrCreate(sparkSession) + } else { + NoopCache + } + new InMemoryFileIndex( + sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) + } + /** Returns the name and schema of the source that can be used to continually read data. */ private def sourceSchema(): SourceInfo = { providingClass.newInstance() match { @@ -356,13 +348,7 @@ case class DataSource( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None) - val fileCatalog = if (userSpecifiedSchema.nonEmpty) { - val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog) - new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema)) - } else { - tempFileCatalog - } + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -384,13 +370,10 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) + val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true, + checkEmptyGlobPath = true, checkFilesExist = true) + val (dataSchema, partitionSchema) = + getOrInferFileFormatSchema(format, Some(inMemoryFileIndex)) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -400,8 +383,7 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - new InMemoryFileIndex( - sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) + inMemoryFileIndex } HadoopFsRelation( @@ -706,13 +688,14 @@ object DataSource extends Logging { private def checkAndGlobPathIfNecessary( hadoopConf: Configuration, path: String, + checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - if (globPath.isEmpty) { + if (checkEmptyGlobPath && globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 318ada0ceefc5..739d1f456e3ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration * @param rootPathsSpecified the list of root table paths to scan (some of which might be * filtered out later) * @param parameters as set of options to control discovery - * @param partitionSchema an optional partition schema that will be use to provide types for the - * discovered partitions + * @param userSpecifiedSchema an optional user specified schema that will be use to provide + * types for the discovered partitions */ class InMemoryFileIndex( sparkSession: SparkSession, rootPathsSpecified: Seq[Path], parameters: Map[String, String], - partitionSchema: Option[StructType], + userSpecifiedSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) extends PartitioningAwareFileIndex( - sparkSession, parameters, partitionSchema, fileStatusCache) { + sparkSession, parameters, userSpecifiedSchema, fileStatusCache) { // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 6b6f6388d54e8..19f76704db691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType} * It provides the necessary methods to parse partition data based on a set of files. * * @param parameters as set of options to control partition discovery - * @param userPartitionSchema an optional partition schema that will be use to provide types for - * the discovered partitions + * @param userSpecifiedSchema an optional user specified schema that will be use to provide + * types for the discovered partitions */ abstract class PartitioningAwareFileIndex( sparkSession: SparkSession, parameters: Map[String, String], - userPartitionSchema: Option[StructType], + userSpecifiedSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { import PartitioningAwareFileIndex.BASE_PATH_PARAM @@ -126,35 +126,35 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - - userPartitionSchema match { + val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + timeZoneId = timeZoneId) + userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => - val spec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = false, - basePaths = basePaths, - timeZoneId = timeZoneId) + val userPartitionSchema = + combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) - // Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => + val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { + case StringType => Literal.create(row.getUTF8String(i), StringType) + case otherType => Literal.create(row.get(i, otherType)) + } Cast( - Literal.create(row.getUTF8String(i), StringType), - userProvidedSchema.fields(i).dataType, + expr, + userPartitionSchema.fields(i).dataType, Option(timeZoneId)).eval() }: _*) } - PartitionSpec(userProvidedSchema, spec.partitions.map { part => + PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part => part.copy(values = castPartitionValuesToUserSchema(part.values)) }) case _ => - PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths, - timeZoneId = timeZoneId) + inferredPartitionSpec } } @@ -236,6 +236,25 @@ abstract class PartitioningAwareFileIndex( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } + + /** + * In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or + * inconsistent data types as reported in SPARK-21463. + * @param spec A partition inference result + * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` + */ + private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = { + val equality = sparkSession.sessionState.conf.resolver + val resolved = spec.partitionColumns.map { partitionField => + // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred + userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( + partitionField) + } + StructType(resolved) + } } object PartitioningAwareFileIndex { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 1da703cefd8ea..5cacdd070b735 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType * A [[FileIndex]] that generates the list of files to processing by reading them from the * metadata log files generated by the [[FileStreamSink]]. * - * @param userPartitionSchema an optional partition schema that will be use to provide types for - * the discovered partitions + * @param userSpecifiedSchema an optional user specified schema that will be use to provide + * types for the discovered partitions */ class MetadataLogFileIndex( sparkSession: SparkSession, path: Path, - userPartitionSchema: Option[StructType]) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) { + userSpecifiedSchema: Option[StructType]) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) { private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") @@ -51,7 +51,7 @@ class MetadataLogFileIndex( } override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { - allFilesFromLog.toArray.groupBy(_.getPath.getParent) + allFilesFromLog.groupBy(_.getPath.getParent) } override def rootPaths: Seq[Path] = path :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c1d61b843d899..8764f0c42cf9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi sparkSession = spark, rootPathsSpecified = Seq(new Path(tempDir)), parameters = Map.empty[String, String], - partitionSchema = None) + userSpecifiedSchema = None) // This should not fail. fileCatalog.listLeafFiles(Seq(new Path(tempDir))) From 5028fe27da7fa1ebdc7e8403759eb91ec9cacc64 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 9 Apr 2018 07:51:29 +0800 Subject: [PATCH 02/13] fix checkFilesExist --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 46aae80430aea..00d597f75d065 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -371,7 +371,7 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true, - checkEmptyGlobPath = true, checkFilesExist = true) + checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, Some(inMemoryFileIndex)) From 603a836ee91409a1bf0af84245a3b38a92ebd896 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 9 Apr 2018 14:51:05 +0800 Subject: [PATCH 03/13] fix test failure --- .../PartitionProviderCompatibilitySuite.scala | 2 +- .../hive/PartitionedTablePerfStatsSuite.scala | 26 +++++++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 80afc9d8f44bc..ab3f5b985e2ae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol < 2").count() == 2) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 1a86c604d5da3..e281ab7ea1a50 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -99,6 +99,16 @@ class PartitionedTablePerfStatsSuite } } + /** For data source tables, all the files should be parsed once for creating file index */ + private def checkFilesDiscovered(isDatasourceTable: Boolean, count: Int): Unit = { + val expectCount = if (isDatasourceTable) { + count + 5 + } else { + count + } + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == expectCount) + } + genericTest("partitioned pruned table reports only selected files") { spec => assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") withTable("test") { @@ -137,7 +147,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() spark.sql("select * from test where partCol1 = 999").count() assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + checkFilesDiscovered(spec.isDatasourceTable, 0) HiveCatalogMetrics.reset() spark.sql("select * from test where partCol1 < 2").count() @@ -178,7 +188,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol1 = 999").count() == 0) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + checkFilesDiscovered(spec.isDatasourceTable, 0) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) HiveCatalogMetrics.reset() @@ -218,13 +228,13 @@ class PartitionedTablePerfStatsSuite spec.setupTable("test", dir) HiveCatalogMetrics.reset() assert(spark.sql("select * from test").count() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + checkFilesDiscovered(spec.isDatasourceTable, 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) HiveCatalogMetrics.reset() spark.sql("refresh table test") assert(spark.sql("select * from test").count() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + checkFilesDiscovered(spec.isDatasourceTable, 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) spark.catalog.cacheTable("test") @@ -247,10 +257,10 @@ class PartitionedTablePerfStatsSuite spec.setupTable("test", dir) HiveCatalogMetrics.reset() assert(spark.sql("select * from test").count() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + checkFilesDiscovered(spec.isDatasourceTable, 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) assert(spark.sql("select * from test").count() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) + checkFilesDiscovered(spec.isDatasourceTable, 10) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } @@ -263,7 +273,7 @@ class PartitionedTablePerfStatsSuite withTempDir { dir => HiveCatalogMetrics.reset() setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } @@ -419,7 +429,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() spark.read.load(dir.getAbsolutePath) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) - assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } } From 378d0ccc37c9a10637a265581d55b6219fda6cc8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 03:03:31 +0800 Subject: [PATCH 04/13] make it simple --- .../execution/datasources/DataSource.scala | 92 ++++++++----------- .../PartitionProviderCompatibilitySuite.scala | 2 +- .../hive/PartitionedTablePerfStatsSuite.scala | 14 +-- 3 files changed, 48 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 00d597f75d065..ad7c9f8f906d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -126,28 +126,25 @@ case class DataSource( * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ - private def getOrInferFileFormatSchema( - format: FileFormat, - fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { + private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below - lazy val tempFileIndex = fileIndex.getOrElse( - createInMemoryFileIndex(withFileStatusCache = false)) + val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - tempFileIndex.partitionSchema + inMemoryFileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { - val inferredPartitions = tempFileIndex.partitionSchema + val inferredPartitions = inMemoryFileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { - val inferredPartitions = tempFileIndex.partitionSchema + val inferredPartitions = inMemoryFileIndex.partitionSchema val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( @@ -176,7 +173,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - tempFileIndex.allFiles()) + inMemoryFileIndex.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -198,21 +195,11 @@ case class DataSource( (dataSchema, partitionSchema) } - /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ - private def createInMemoryFileIndex( - withFileStatusCache: Boolean, - checkEmptyGlobPath: Boolean = false, - checkFilesExist: Boolean = false): InMemoryFileIndex = { - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary( - hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray - val fileStatusCache = if (withFileStatusCache) { - FileStatusCache.getOrCreate(sparkSession) - } else { - NoopCache - } + /** An [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private lazy val inMemoryFileIndex: InMemoryFileIndex = { + val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) new InMemoryFileIndex( sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) } @@ -370,10 +357,9 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => - val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true, - checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) + checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val (dataSchema, partitionSchema) = - getOrInferFileFormatSchema(format, Some(inMemoryFileIndex)) + getOrInferFileFormatSchema(format) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -534,6 +520,33 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** + * Checks and returns files in all the paths. + */ + private def checkAndGlobPathIfNecessary( + checkEmptyGlobPath: Boolean, + checkFilesExist: Boolean): Seq[Path] = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() + allPaths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (checkEmptyGlobPath && globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + }.toSeq + } } object DataSource extends Logging { @@ -681,31 +694,6 @@ object DataSource extends Logging { locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } - /** - * If `path` is a file pattern, return all the files that match it. Otherwise, return itself. - * If `checkFilesExist` is `true`, also check the file existence. - */ - private def checkAndGlobPathIfNecessary( - hadoopConf: Configuration, - path: String, - checkEmptyGlobPath: Boolean, - checkFilesExist: Boolean): Seq[Path] = { - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - } - /** * Called before writing into a FileFormat based data source to make sure the * supplied schema is not empty. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index ab3f5b985e2ae..80afc9d8f44bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol < 2").count() == 2) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index e281ab7ea1a50..a7e694a9b862c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -147,7 +147,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() spark.sql("select * from test where partCol1 = 999").count() assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - checkFilesDiscovered(spec.isDatasourceTable, 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) HiveCatalogMetrics.reset() spark.sql("select * from test where partCol1 < 2").count() @@ -188,7 +188,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol1 = 999").count() == 0) assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - checkFilesDiscovered(spec.isDatasourceTable, 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) HiveCatalogMetrics.reset() @@ -228,13 +228,13 @@ class PartitionedTablePerfStatsSuite spec.setupTable("test", dir) HiveCatalogMetrics.reset() assert(spark.sql("select * from test").count() == 5) - checkFilesDiscovered(spec.isDatasourceTable, 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) HiveCatalogMetrics.reset() spark.sql("refresh table test") assert(spark.sql("select * from test").count() == 5) - checkFilesDiscovered(spec.isDatasourceTable, 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) spark.catalog.cacheTable("test") @@ -257,10 +257,10 @@ class PartitionedTablePerfStatsSuite spec.setupTable("test", dir) HiveCatalogMetrics.reset() assert(spark.sql("select * from test").count() == 5) - checkFilesDiscovered(spec.isDatasourceTable, 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) assert(spark.sql("select * from test").count() == 5) - checkFilesDiscovered(spec.isDatasourceTable, 10) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } @@ -273,7 +273,7 @@ class PartitionedTablePerfStatsSuite withTempDir { dir => HiveCatalogMetrics.reset() setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } From 4b5e2dbee652a9e0490ed6485b09ba6583a192fb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 03:08:05 +0800 Subject: [PATCH 05/13] revise --- .../execution/datasources/PartitioningAwareFileIndex.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 19f76704db691..cc8af7b92c454 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -139,12 +139,9 @@ abstract class PartitioningAwareFileIndex( // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { - case StringType => Literal.create(row.getUTF8String(i), StringType) - case otherType => Literal.create(row.get(i, otherType)) - } + val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType Cast( - expr, + Literal.create(row.get(i, dt), dt), userPartitionSchema.fields(i).dataType, Option(timeZoneId)).eval() }: _*) From 71d98ed9d6af9304fcaee6c9be966e4a2fdcb5b0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 12:25:23 +0800 Subject: [PATCH 06/13] revise --- .../spark/sql/execution/datasources/DataSource.scala | 11 ++++++----- .../sql/hive/PartitionedTablePerfStatsSuite.scala | 10 ---------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ad7c9f8f906d5..0cca09409b8ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -127,10 +127,6 @@ case class DataSource( * columns. */ private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { - // the operations below are expensive therefore try not to do them if we don't need to, e.g., - // in streaming mode, we have already inferred and registered partition columns, we will - // never have to materialize the lazy val below - val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource @@ -195,7 +191,12 @@ case class DataSource( (dataSchema, partitionSchema) } - /** An [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + /** + * An [[InMemoryFileIndex]] that can be used to get partition schema and file list. + * The operations below are expensive therefore try not to do them if we don't need to, e.g., + * in streaming mode, we have already inferred and registered partition columns, we will + * never have to materialize the lazy val below + */ private lazy val inMemoryFileIndex: InMemoryFileIndex = { val globbedPaths = checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index a7e694a9b862c..3af163af0968c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -99,16 +99,6 @@ class PartitionedTablePerfStatsSuite } } - /** For data source tables, all the files should be parsed once for creating file index */ - private def checkFilesDiscovered(isDatasourceTable: Boolean, count: Int): Unit = { - val expectCount = if (isDatasourceTable) { - count + 5 - } else { - count - } - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == expectCount) - } - genericTest("partitioned pruned table reports only selected files") { spec => assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") withTable("test") { From 553a412f089dcdffee7ab2e182f7947554eabbec Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 15:56:18 +0800 Subject: [PATCH 07/13] revise --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0cca09409b8ba..55d30fd79829b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -122,7 +121,6 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ From 2b99b12e5c179417a437ca9dea3b5733a8481781 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 19:13:15 +0800 Subject: [PATCH 08/13] revise --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 55d30fd79829b..cf86a196691be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -357,8 +357,7 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) - val (dataSchema, partitionSchema) = - getOrInferFileFormatSchema(format) + val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { From 00438cdb756e47ea9575a29d8e44e09550df5eaa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 20:14:55 +0800 Subject: [PATCH 09/13] use the old way --- .../execution/datasources/DataSource.scala | 102 ++++++++++-------- .../PartitioningAwareFileIndex.scala | 7 +- .../hive/PartitionedTablePerfStatsSuite.scala | 2 +- 3 files changed, 64 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index cf86a196691be..00d597f75d065 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -121,24 +122,32 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource + * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ - private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { + private def getOrInferFileFormatSchema( + format: FileFormat, + fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { + // the operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + lazy val tempFileIndex = fileIndex.getOrElse( + createInMemoryFileIndex(withFileStatusCache = false)) val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - inMemoryFileIndex.partitionSchema + tempFileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { - val inferredPartitions = inMemoryFileIndex.partitionSchema + val inferredPartitions = tempFileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { - val inferredPartitions = inMemoryFileIndex.partitionSchema + val inferredPartitions = tempFileIndex.partitionSchema val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( @@ -167,7 +176,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - inMemoryFileIndex.allFiles()) + tempFileIndex.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -189,16 +198,21 @@ case class DataSource( (dataSchema, partitionSchema) } - /** - * An [[InMemoryFileIndex]] that can be used to get partition schema and file list. - * The operations below are expensive therefore try not to do them if we don't need to, e.g., - * in streaming mode, we have already inferred and registered partition columns, we will - * never have to materialize the lazy val below - */ - private lazy val inMemoryFileIndex: InMemoryFileIndex = { - val globbedPaths = - checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) - val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex( + withFileStatusCache: Boolean, + checkEmptyGlobPath: Boolean = false, + checkFilesExist: Boolean = false): InMemoryFileIndex = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val globbedPaths = allPaths.flatMap( + DataSource.checkAndGlobPathIfNecessary( + hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray + val fileStatusCache = if (withFileStatusCache) { + FileStatusCache.getOrCreate(sparkSession) + } else { + NoopCache + } new InMemoryFileIndex( sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) } @@ -356,8 +370,10 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => - checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) - val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true, + checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) + val (dataSchema, partitionSchema) = + getOrInferFileFormatSchema(format, Some(inMemoryFileIndex)) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { @@ -518,33 +534,6 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } - - /** - * Checks and returns files in all the paths. - */ - private def checkAndGlobPathIfNecessary( - checkEmptyGlobPath: Boolean, - checkFilesExist: Boolean): Seq[Path] = { - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - }.toSeq - } } object DataSource extends Logging { @@ -692,6 +681,31 @@ object DataSource extends Logging { locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } + /** + * If `path` is a file pattern, return all the files that match it. Otherwise, return itself. + * If `checkFilesExist` is `true`, also check the file existence. + */ + private def checkAndGlobPathIfNecessary( + hadoopConf: Configuration, + path: String, + checkEmptyGlobPath: Boolean, + checkFilesExist: Boolean): Seq[Path] = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (checkEmptyGlobPath && globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + } + /** * Called before writing into a FileFormat based data source to make sure the * supplied schema is not empty. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index cc8af7b92c454..19f76704db691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -139,9 +139,12 @@ abstract class PartitioningAwareFileIndex( // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType + val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { + case StringType => Literal.create(row.getUTF8String(i), StringType) + case otherType => Literal.create(row.get(i, otherType)) + } Cast( - Literal.create(row.get(i, dt), dt), + expr, userPartitionSchema.fields(i).dataType, Option(timeZoneId)).eval() }: _*) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 3af163af0968c..1a86c604d5da3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() spark.read.load(dir.getAbsolutePath) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) - assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1) } } } From 9a2af2dbbebb5f5f37ee5ee856c7dcf6a71b45aa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 22:16:24 +0800 Subject: [PATCH 10/13] best refactor --- .../execution/datasources/DataSource.scala | 120 +++++++++--------- .../hive/PartitionedTablePerfStatsSuite.scala | 2 +- 2 files changed, 60 insertions(+), 62 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 00d597f75d065..3b7362a048cab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { + val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) + createInMemoryFileIndex(globbedPaths) + } bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( @@ -122,32 +130,29 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list + * @param optionalFileIndex optional [[FileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ private def getOrInferFileFormatSchema( format: FileFormat, - fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { - // the operations below are expensive therefore try not to do them if we don't need to, e.g., - // in streaming mode, we have already inferred and registered partition columns, we will - // never have to materialize the lazy val below - lazy val tempFileIndex = fileIndex.getOrElse( - createInMemoryFileIndex(withFileStatusCache = false)) + optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = { + def fileIndex = optionalFileIndex.getOrElse(tempFileIndex) + val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - tempFileIndex.partitionSchema + fileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { - val inferredPartitions = tempFileIndex.partitionSchema + val inferredPartitions = fileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { - val inferredPartitions = tempFileIndex.partitionSchema + val inferredPartitions = fileIndex.partitionSchema val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( @@ -173,10 +178,14 @@ case class DataSource( val dataSchema = userSpecifiedSchema.map { schema => StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) }.orElse { + val index = fileIndex match { + case i: InMemoryFileIndex => i + case _ => tempFileIndex + } format.inferSchema( sparkSession, caseInsensitiveOptions, - tempFileIndex.allFiles()) + index.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -198,25 +207,6 @@ case class DataSource( (dataSchema, partitionSchema) } - /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ - private def createInMemoryFileIndex( - withFileStatusCache: Boolean, - checkEmptyGlobPath: Boolean = false, - checkFilesExist: Boolean = false): InMemoryFileIndex = { - val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary( - hadoopConf, _, checkEmptyGlobPath, checkFilesExist)).toArray - val fileStatusCache = if (withFileStatusCache) { - FileStatusCache.getOrCreate(sparkSession) - } else { - NoopCache - } - new InMemoryFileIndex( - sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) - } - /** Returns the name and schema of the source that can be used to continually read data. */ private def sourceSchema(): SourceInfo = { providingClass.newInstance() match { @@ -370,11 +360,8 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => - val inMemoryFileIndex = createInMemoryFileIndex(withFileStatusCache = true, - checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) - val (dataSchema, partitionSchema) = - getOrInferFileFormatSchema(format, Some(inMemoryFileIndex)) - + val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes @@ -383,8 +370,10 @@ case class DataSource( catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { - inMemoryFileIndex + createInMemoryFileIndex(globbedPaths) } + val (dataSchema, partitionSchema) = + getOrInferFileFormatSchema(format, Some(fileCatalog)) HadoopFsRelation( fileCatalog, @@ -534,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex( + sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache) + } + + /** + * Checks and returns files in all the paths. + */ + private def checkAndGlobPathIfNecessary( + checkEmptyGlobPath: Boolean, + checkFilesExist: Boolean): Seq[Path] = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() + allPaths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (checkEmptyGlobPath && globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + }.toSeq + } } object DataSource extends Logging { @@ -681,31 +704,6 @@ object DataSource extends Logging { locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } - /** - * If `path` is a file pattern, return all the files that match it. Otherwise, return itself. - * If `checkFilesExist` is `true`, also check the file existence. - */ - private def checkAndGlobPathIfNecessary( - hadoopConf: Configuration, - path: String, - checkEmptyGlobPath: Boolean, - checkFilesExist: Boolean): Seq[Path] = { - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) - - if (checkEmptyGlobPath && globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - } - /** * Called before writing into a FileFormat based data source to make sure the * supplied schema is not empty. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 1a86c604d5da3..3af163af0968c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite HiveCatalogMetrics.reset() spark.read.load(dir.getAbsolutePath) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) - assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) } } } From 114737f31f35b559b93a08685abffea64b7486c7 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 22:24:13 +0800 Subject: [PATCH 11/13] revise --- .../execution/datasources/PartitioningAwareFileIndex.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 19f76704db691..cc8af7b92c454 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -139,12 +139,9 @@ abstract class PartitioningAwareFileIndex( // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { - case StringType => Literal.create(row.getUTF8String(i), StringType) - case otherType => Literal.create(row.get(i, otherType)) - } + val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType Cast( - expr, + Literal.create(row.get(i, dt), dt), userPartitionSchema.fields(i).dataType, Option(timeZoneId)).eval() }: _*) From 8c8bf69b4d0e20f1aa524d5839e0543d558024d4 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 12 Apr 2018 23:29:57 +0800 Subject: [PATCH 12/13] Use CatalogFileIndex to get schemas --- .../execution/datasources/DataSource.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3b7362a048cab..d1a60fa40ae80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -130,29 +130,29 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param optionalFileIndex optional [[FileIndex]] for getting partition schema and file list + * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ private def getOrInferFileFormatSchema( format: FileFormat, - optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = { - def fileIndex = optionalFileIndex.getOrElse(tempFileIndex) + fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { + def inMemoryFileIndex = fileIndex.getOrElse(tempFileIndex) val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - fileIndex.partitionSchema + inMemoryFileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { - val inferredPartitions = fileIndex.partitionSchema + val inferredPartitions = inMemoryFileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { - val inferredPartitions = fileIndex.partitionSchema + val inferredPartitions = inMemoryFileIndex.partitionSchema val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( @@ -178,14 +178,10 @@ case class DataSource( val dataSchema = userSpecifiedSchema.map { schema => StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) }.orElse { - val index = fileIndex match { - case i: InMemoryFileIndex => i - case _ => tempFileIndex - } format.inferSchema( sparkSession, caseInsensitiveOptions, - index.allFiles()) + inMemoryFileIndex.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -362,18 +358,22 @@ case class DataSource( case (format: FileFormat, _) => val globbedPaths = checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) - val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && - catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { + val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && + catalogTable.get.partitionSchema.nonEmpty + val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) { val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes - new CatalogFileIndex( + val index = new CatalogFileIndex( sparkSession, catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) + (index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema) } else { - createInMemoryFileIndex(globbedPaths) + val index = createInMemoryFileIndex(globbedPaths) + val (resultDataSchema, resultPartitionSchema) = + getOrInferFileFormatSchema(format, Some(index)) + (index, resultDataSchema, resultPartitionSchema) } - val (dataSchema, partitionSchema) = - getOrInferFileFormatSchema(format, Some(fileCatalog)) HadoopFsRelation( fileCatalog, From 12ac191cb29f4ba1f817abffc8c7422efe837b38 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 13 Apr 2018 13:26:16 +0800 Subject: [PATCH 13/13] address comments --- .../execution/datasources/DataSource.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d1a60fa40ae80..f16d824201e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -95,14 +94,6 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver - // The operations below are expensive therefore try not to do them if we don't need to, e.g., - // in streaming mode, we have already inferred and registered partition columns, we will - // never have to materialize the lazy val below - private lazy val tempFileIndex = { - val globbedPaths = - checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) - createInMemoryFileIndex(globbedPaths) - } bucketSpec.map { bucket => SchemaUtils.checkColumnNameDuplication( @@ -137,22 +128,29 @@ case class DataSource( private def getOrInferFileFormatSchema( format: FileFormat, fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { - def inMemoryFileIndex = fileIndex.getOrElse(tempFileIndex) + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + lazy val tempFileIndex = fileIndex.getOrElse { + val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) + createInMemoryFileIndex(globbedPaths) + } val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource - inMemoryFileIndex.partitionSchema + tempFileIndex.partitionSchema } else { // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred // partitioning if (userSpecifiedSchema.isEmpty) { - val inferredPartitions = inMemoryFileIndex.partitionSchema + val inferredPartitions = tempFileIndex.partitionSchema inferredPartitions } else { val partitionFields = partitionColumns.map { partitionColumn => userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse { - val inferredPartitions = inMemoryFileIndex.partitionSchema + val inferredPartitions = tempFileIndex.partitionSchema val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn)) if (inferredOpt.isDefined) { logDebug( @@ -181,7 +179,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - inMemoryFileIndex.allFiles()) + tempFileIndex.allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format. It must be specified manually.") @@ -360,7 +358,7 @@ case class DataSource( checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && - catalogTable.get.partitionSchema.nonEmpty + catalogTable.get.partitionColumnNames.nonEmpty val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) { val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes val index = new CatalogFileIndex(