diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6114142cefea..618ea3d669bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -136,7 +136,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None) + val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) format.inferSchema( sparkSession, caseInsensitiveOptions, @@ -258,7 +258,7 @@ case class DataSource( case (format: FileFormat, _) if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new StreamFileCatalog(sparkSession, basePath) + val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -310,8 +310,8 @@ case class DataSource( }) } - val fileCatalog: FileCatalog = - new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema) + val fileCatalog = + new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema) val dataSchema = userSpecifiedSchema.map { schema => val equality = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala new file mode 100644 index 000000000000..bdf43e02f4a0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable +import scala.util.Try + +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + + +/** + * A [[FileCatalog]] that generates the list of files to process by recursively listing all the + * files present in `paths`. + * + * @param parameters as set of options to control discovery + * @param paths a list of paths to scan + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions + */ +class ListingFileCatalog( + sparkSession: SparkSession, + override val paths: Seq[Path], + parameters: Map[String, String], + partitionSchema: Option[StructType]) + extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { + + @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ + @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ + @volatile private var cachedPartitionSpec: PartitionSpec = _ + + refresh() + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec + } + + override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + cachedLeafFiles + } + + override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + cachedLeafDirToChildrenFiles + } + + override def refresh(): Unit = { + val files = listLeafFiles(paths) + cachedLeafFiles = + new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) + cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) + cachedPartitionSpec = null + } + + protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) + } else { + val statuses: Seq[FileStatus] = paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + logInfo(s"Listing $path on driver") + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + + val statuses = { + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + } + + statuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a + // a big deal since we always use to `listLeafFilesInParallel` when the number of paths + // exceeds threshold. + case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + } + }.filterNot { status => + val name = status.getPath.getName + HadoopFsRelation.shouldFilterOut(name) + } + + val (dirs, files) = statuses.partition(_.isDirectory) + + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) + if (dirs.isEmpty) { + mutable.LinkedHashSet(files: _*) + } else { + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) + } + } + } + + override def equals(other: Any): Boolean = other match { + case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet + case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala new file mode 100644 index 000000000000..9d997d628579 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StringType, StructType} + + +/** + * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. + * It provides the necessary methods to parse partition data based on a set of files. + * + * @param parameters as set of options to control partition discovery + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions +*/ +abstract class PartitioningAwareFileCatalog( + sparkSession: SparkSession, + parameters: Map[String, String], + partitionSchema: Option[StructType]) + extends FileCatalog with Logging { + + protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + + protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] + + protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + if (partitionSpec().partitionColumns.isEmpty) { + Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + } else { + prunePartitions(filters, partitionSpec()).map { + case PartitionDirectory(values, path) => + Partition( + values, + leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_")) + } + } + } + + override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq + + protected def inferPartitioning(): PartitionSpec = { + // We use leaf dirs containing data files to discover the schema. + val leafDirs = leafDirToChildrenFiles.keys.toSeq + partitionSchema match { + case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => + val spec = PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = false, + basePaths = basePaths) + + // Without auto inference, all of value in the `row` should be null or in StringType, + // we need to cast into the data type that user specified. + def castPartitionValuesToUserSchema(row: InternalRow) = { + InternalRow((0 until row.numFields).map { i => + Cast( + Literal.create(row.getUTF8String(i), StringType), + userProvidedSchema.fields(i).dataType).eval() + }: _*) + } + + PartitionSpec(userProvidedSchema, spec.partitions.map { part => + part.copy(values = castPartitionValuesToUserSchema(part.values)) + }) + case _ => + PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(), + basePaths = basePaths) + } + } + + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionDirectory(values, _) => boundPredicate(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." + } + + selected + } else { + partitions + } + } + + /** + * Contains a set of paths that are considered as the base dirs of the input datasets. + * The partitioning discovery logic will make sure it will stop when it reaches any + * base path. By default, the paths of the dataset provided by users will be base paths. + * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path + * will be `/path/something=true/`, and the returned DataFrame will not contain a column of + * `something`. If users want to override the basePath. They can set `basePath` in the options + * to pass the new base path to the data source. + * For the above example, if the user-provided base path is `/path/`, the returned + * DataFrame will have the column of `something`. + */ + private def basePaths: Set[Path] = { + val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) + userDefinedBasePath.getOrElse { + // If the user does not provide basePath, we will just use paths. + paths.toSet + }.map { hdfsPath => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = hdfsPath.getFileSystem(hadoopConf) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 0a3461151c62..3d375f23191c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -264,232 +264,31 @@ case class Partition(values: InternalRow, files: Seq[FileStatus]) * as the partitioning characteristics of those files. */ trait FileCatalog { + + /** Returns the list of input paths from which the catalog will get files. */ def paths: Seq[Path] + /** Returns the specification of the partitions inferred from the data. */ def partitionSpec(): PartitionSpec /** * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with not partition values. + * unpartitioned, this will return a single partition with no partition values. * - * @param filters the filters used to prune which partitions are returned. These filters must + * @param filters The filters used to prune which partitions are returned. These filters must * only refer to partition columns and this method will only return files * where these predicates are guaranteed to evaluate to `true`. Thus, these * filters will not need to be evaluated again on the returned data. */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Returns all the valid files. */ def allFiles(): Seq[FileStatus] - def getStatus(path: Path): Array[FileStatus] - + /** Refresh the file listing */ def refresh(): Unit } -/** - * A file catalog that caches metadata gathered by scanning all the files present in `paths` - * recursively. - * - * @param parameters as set of options to control discovery - * @param paths a list of paths to scan - * @param partitionSchema an optional partition schema that will be use to provide types for the - * discovered partitions - */ -class HDFSFileCatalog( - sparkSession: SparkSession, - parameters: Map[String, String], - override val paths: Seq[Path], - partitionSchema: Option[StructType]) - extends FileCatalog with Logging { - - private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) - - var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] - var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - var cachedPartitionSpec: PartitionSpec = _ - - def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - cachedPartitionSpec = inferPartitioning(partitionSchema) - } - - cachedPartitionSpec - } - - refresh() - - override def listFiles(filters: Seq[Expression]): Seq[Partition] = { - if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil - } else { - prunePartitions(filters, partitionSpec()).map { - case PartitionDirectory(values, path) => - Partition( - values, - getStatus(path).filterNot(_.getPath.getName startsWith "_")) - } - } - } - - protected def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate.create(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - val selected = partitions.filter { - case PartitionDirectory(values, _) => boundPredicate(values) - } - logInfo { - val total = partitions.length - val selectedSize = selected.length - val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." - } - - selected - } else { - partitions - } - } - - def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq - - def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) - - private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) - } else { - val statuses: Seq[FileStatus] = paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - logInfo(s"Listing $path on driver") - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } - - statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) - } - }.filterNot { status => - val name = status.getPath.getName - HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { - mutable.LinkedHashSet(files: _*) - } else { - mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) - } - } - } - - def inferPartitioning(schema: Option[StructType]): PartitionSpec = { - // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.keys.toSeq - schema match { - case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => - val spec = PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = false, - basePaths = basePaths) - - // Without auto inference, all of value in the `row` should be null or in StringType, - // we need to cast into the data type that user specified. - def castPartitionValuesToUserSchema(row: InternalRow) = { - InternalRow((0 until row.numFields).map { i => - Cast( - Literal.create(row.getUTF8String(i), StringType), - userProvidedSchema.fields(i).dataType).eval() - }: _*) - } - - PartitionSpec(userProvidedSchema, spec.partitions.map { part => - part.copy(values = castPartitionValuesToUserSchema(part.values)) - }) - case _ => - PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(), - basePaths = basePaths) - } - } - - /** - * Contains a set of paths that are considered as the base dirs of the input datasets. - * The partitioning discovery logic will make sure it will stop when it reaches any - * base path. By default, the paths of the dataset provided by users will be base paths. - * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path - * will be `/path/something=true/`, and the returned DataFrame will not contain a column of - * `something`. If users want to override the basePath. They can set `basePath` in the options - * to pass the new base path to the data source. - * For the above example, if the user-provided base path is `/path/`, the returned - * DataFrame will have the column of `something`. - */ - private def basePaths: Set[Path] = { - val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) - userDefinedBasePath.getOrElse { - // If the user does not provide basePath, we will just use paths. - paths.toSet - }.map { hdfsPath => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = hdfsPath.getFileSystem(hadoopConf) - hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - } - - def refresh(): Unit = { - val files = listLeafFiles(paths) - - leafFiles.clear() - leafDirToChildrenFiles.clear() - - leafFiles ++= files.map(f => f.getPath -> f) - leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) - - cachedPartitionSpec = null - } - - override def equals(other: Any): Boolean = other match { - case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet - case _ => false - } - - override def hashCode(): Int = paths.toSet.hashCode() -} /** * Helper methods for gathering metadata from HDFS. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala new file mode 100644 index 000000000000..20ade12e3796 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources._ + + +/** + * A [[FileCatalog]] that generates the list of files to processing by reading them from the + * metadata log files generated by the [[FileStreamSink]]. + */ +class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) + extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) { + + private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) + private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) + private var cachedPartitionSpec: PartitionSpec = _ + + override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f) + } + + override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + allFilesFromLog.toArray.groupBy(_.getPath.getParent) + } + + override def paths: Seq[Path] = path :: Nil + + override def refresh(): Unit = { } + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala deleted file mode 100644 index 4f699719c276..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.hadoop.fs.{FileStatus, Path} - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileCatalog, Partition, PartitionSpec} -import org.apache.spark.sql.types.StructType - -class StreamFileCatalog(sparkSession: SparkSession, path: Path) extends FileCatalog with Logging { - val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading streaming file log from $metadataDirectory") - val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - - override def paths: Seq[Path] = path :: Nil - - override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with not partition values. - * - * @param filters the filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - override def listFiles(filters: Seq[Expression]): Seq[Partition] = - Partition(InternalRow.empty, allFiles()) :: Nil - - override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) - - override def refresh(): Unit = {} - - override def allFiles(): Seq[FileStatus] = { - metadataLog.allFiles().map(_.toFileStatus) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 609ca976a016..e937fc3e876e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -22,12 +22,14 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter} -import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.datasources.parquet -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream} +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest with SharedSQLContext { @@ -147,7 +149,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { } } - test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]") { + test("FileStreamSink - partitioned writing and batch reading") { val inputData = MemoryStream[Int] val ds = inputData.toDS() @@ -157,7 +159,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { var query: ContinuousQuery = null try { - query = + query = ds.map(i => (i, i * 1000)) .toDF("id", "value") .write @@ -171,12 +173,58 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { query.processAllAvailable() } - // TODO (tdas): Test partition column can be read or not val outputDf = sqlContext.read.parquet(outputDir) + val expectedSchema = new StructType() + .add(StructField("value", IntegerType)) + .add(StructField("id", IntegerType)) + assert(outputDf.schema === expectedSchema) + + // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has + // been inferred + val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { + case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] => + baseRelation.asInstanceOf[HadoopFsRelation] + } + assert(hadoopdFsRelations.size === 1) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog]) + assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) + assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) + + // Verify the data is correctly read checkDataset( - outputDf.as[Int], - 1000, 2000, 3000) + outputDf.as[(Int, Int)], + (1000, 1), (2000, 2), (3000, 3)) + + /** Check some condition on the partitions of the FileScanRDD generated by a DF */ + def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { + val getFileScanRDD = df.queryExecution.executedPlan.collect { + case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] + }.headOption.getOrElse { + fail(s"No FileScan in query\n${df.queryExecution}") + } + func(getFileScanRDD.filePartitions) + } + // Read without pruning + checkFileScanPartitions(outputDf) { partitions => + // There should be as many distinct partition values as there are distinct ids + assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) + } + + // Read with pruning, should read only files in partition dir id=1 + checkFileScanPartitions(outputDf.filter("id = 1")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) + } + + // Read with pruning, should read only files in partition dir id=1 and id=2 + checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) + } } finally { if (query != null) { query.stop() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0b24d358548b..7a799b6c87e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -536,17 +536,12 @@ private[hive] class MetaStoreFileCatalog( sparkSession: SparkSession, paths: Seq[Path], partitionSpecFromHive: PartitionSpec) - extends HDFSFileCatalog( + extends ListingFileCatalog( sparkSession, - Map.empty, paths, + Map.empty, Some(partitionSpecFromHive.partitionColumns)) { - override def getStatus(path: Path): Array[FileStatus] = { - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.listStatus(path) - } - override def partitionSpec(): PartitionSpec = partitionSpecFromHive }