From c272f39e029a736badfbe46101fac50f9be15214 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 14:18:36 -0700 Subject: [PATCH 1/6] Refactored HDFSFileCatalog --- .../datasources/fileSourceInterfaces.scala | 152 ++++++++++-------- .../streaming/StreamFileCatalog.scala | 52 +++--- .../sql/streaming/FileStreamSinkSuite.scala | 7 +- 3 files changed, 116 insertions(+), 95 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 0a3461151c627..ca1dc1e439a6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -287,36 +287,24 @@ trait FileCatalog { } /** - * A file catalog that caches metadata gathered by scanning all the files present in `paths` - * recursively. + * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. + * It provides the necessary methods to parse partition data based on a set of files. * - * @param parameters as set of options to control discovery - * @param paths a list of paths to scan + * @param parameters as set of options to control partition discovery * @param partitionSchema an optional partition schema that will be use to provide types for the * discovered partitions - */ -class HDFSFileCatalog( +*/ +abstract class PartitioningAwareFileCatalog( sparkSession: SparkSession, parameters: Map[String, String], - override val paths: Seq[Path], partitionSchema: Option[StructType]) extends FileCatalog with Logging { private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) - var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] - var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - var cachedPartitionSpec: PartitionSpec = _ + protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] - def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - cachedPartitionSpec = inferPartitioning(partitionSchema) - } - - cachedPartitionSpec - } - - refresh() + protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] override def listFiles(filters: Seq[Expression]): Seq[Partition] = { if (partitionSpec().partitionColumns.isEmpty) { @@ -331,45 +319,11 @@ class HDFSFileCatalog( } } - protected def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate.create(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - val selected = partitions.filter { - case PartitionDirectory(values, _) => boundPredicate(values) - } - logInfo { - val total = partitions.length - val selectedSize = selected.length - val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." - } - - selected - } else { - partitions - } - } - - def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq + override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq - def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) + override def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) - private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) } else { @@ -415,10 +369,10 @@ class HDFSFileCatalog( } } - def inferPartitioning(schema: Option[StructType]): PartitionSpec = { + protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. val leafDirs = leafDirToChildrenFiles.keys.toSeq - schema match { + partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( leafDirs, @@ -448,6 +402,41 @@ class HDFSFileCatalog( } } + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate.create( + predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionDirectory(values, _) => boundPredicate(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." + } + + selected + } else { + partitions + } + } + /** * Contains a set of paths that are considered as the base dirs of the input datasets. * The partitioning discovery logic will make sure it will stop when it reaches any @@ -470,16 +459,51 @@ class HDFSFileCatalog( hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) } } +} - def refresh(): Unit = { - val files = listLeafFiles(paths) - leafFiles.clear() - leafDirToChildrenFiles.clear() +/** + * A file catalog that caches metadata gathered by scanning all the files present in `paths` + * recursively. + * + * @param parameters as set of options to control discovery + * @param paths a list of paths to scan + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions + */ +class HDFSFileCatalog( + sparkSession: SparkSession, + parameters: Map[String, String], + override val paths: Seq[Path], + partitionSchema: Option[StructType]) + extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { - leafFiles ++= files.map(f => f.getPath -> f) - leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) + @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ + @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ + @volatile private var cachedPartitionSpec: PartitionSpec = _ + refresh() + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec + } + + override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + cachedLeafFiles + } + + override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + cachedLeafDirToChildrenFiles + } + + override def refresh(): Unit = { + val files = listLeafFiles(paths) + cachedLeafFiles = + new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) + cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala index 4f699719c2768..20a2beeb6be39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -17,42 +17,40 @@ package org.apache.spark.sql.execution.streaming +import scala.collection.mutable + import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.{FileCatalog, Partition, PartitionSpec} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.execution.datasources._ -class StreamFileCatalog(sparkSession: SparkSession, path: Path) extends FileCatalog with Logging { - val metadataDirectory = new Path(path, FileStreamSink.metadataDir) - logInfo(s"Reading streaming file log from $metadataDirectory") - val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - override def paths: Seq[Path] = path :: Nil +class StreamFileCatalog(sparkSession: SparkSession, path: Path) + extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) { + + private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) + logInfo(s"Reading streaming file log from $metadataDirectory") + private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) + private val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) + private var cachedPartitionSpec: PartitionSpec = _ - override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil) + override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f) + } - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with not partition values. - * - * @param filters the filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - override def listFiles(filters: Seq[Expression]): Seq[Partition] = - Partition(InternalRow.empty, allFiles()) :: Nil + override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + allFilesFromLog.toArray.groupBy(_.getPath.getParent) + } - override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path) + override def paths: Seq[Path] = path :: Nil - override def refresh(): Unit = {} + override def refresh(): Unit = { } - override def allFiles(): Seq[FileStatus] = { - metadataLog.allFiles().map(_.toFileStatus) + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 609ca976a0160..c8a12559f89d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -147,7 +147,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { } } - test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]") { + test("FileStreamSink - partitioned writing and batch reading") { val inputData = MemoryStream[Int] val ds = inputData.toDS() @@ -171,11 +171,10 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { query.processAllAvailable() } - // TODO (tdas): Test partition column can be read or not val outputDf = sqlContext.read.parquet(outputDir) checkDataset( - outputDf.as[Int], - 1000, 2000, 3000) + outputDf.as[(Int, Int)], + (1000, 1), (2000, 2), (3000, 3)) } finally { if (query != null) { From e80483e95566282868f1b0dfe2b61a5b6557e6e5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 14:29:59 -0700 Subject: [PATCH 2/6] Some more refactoring --- .../datasources/fileSourceInterfaces.scala | 96 ++++++++++--------- .../streaming/StreamFileCatalog.scala | 1 - 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index ca1dc1e439a6e..edb0eb6254030 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -300,7 +300,7 @@ abstract class PartitioningAwareFileCatalog( partitionSchema: Option[StructType]) extends FileCatalog with Logging { - private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] @@ -323,52 +323,6 @@ abstract class PartitioningAwareFileCatalog( override def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) - protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) - } else { - val statuses: Seq[FileStatus] = paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - logInfo(s"Listing $path on driver") - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } - - statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) - } - }.filterNot { status => - val name = status.getPath.getName - HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { - mutable.LinkedHashSet(files: _*) - } else { - mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) - } - } - } - protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. val leafDirs = leafDirToChildrenFiles.keys.toSeq @@ -507,6 +461,54 @@ class HDFSFileCatalog( cachedPartitionSpec = null } + + protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) + } else { + val statuses: Seq[FileStatus] = paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + logInfo(s"Listing $path on driver") + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + + val statuses = { + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + } + + statuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a + // a big deal since we always use to `listLeafFilesInParallel` when the number of paths + // exceeds threshold. + case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + } + }.filterNot { status => + val name = status.getPath.getName + HadoopFsRelation.shouldFilterOut(name) + } + + val (dirs, files) = statuses.partition(_.isDirectory) + + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) + if (dirs.isEmpty) { + mutable.LinkedHashSet(files: _*) + } else { + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) + } + } + } + + override def equals(other: Any): Boolean = other match { case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala index 20a2beeb6be39..30181b275186d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -31,7 +31,6 @@ class StreamFileCatalog(sparkSession: SparkSession, path: Path) private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") private val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) - private val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) private val allFilesFromLog = metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory) private var cachedPartitionSpec: PartitionSpec = _ From e8639eac1ba53df53623f3a452e916b79aef73d1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 14:37:13 -0700 Subject: [PATCH 3/6] Remove space change --- .../execution/datasources/fileSourceInterfaces.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index edb0eb6254030..8b0ba14ffbc46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -368,12 +368,11 @@ abstract class PartitioningAwareFileCatalog( if (partitionPruningPredicates.nonEmpty) { val predicate = partitionPruningPredicates.reduce(expressions.And) - val boundPredicate = InterpretedPredicate.create( - predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) val selected = partitions.filter { case PartitionDirectory(values, _) => boundPredicate(values) From 973847c85e1dd05f611974cbd27a33c81afa8d40 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 16:41:20 -0700 Subject: [PATCH 4/6] More refactoring --- .../execution/datasources/DataSource.scala | 8 +- .../datasources/ListingFileCatalog.scala | 127 +++++++++ .../PartitioningAwareFileCatalog.scala | 155 +++++++++++ .../datasources/fileSourceInterfaces.scala | 240 +----------------- ...log.scala => MetadataLogFileCatalog.scala} | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 9 +- 6 files changed, 300 insertions(+), 245 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{StreamFileCatalog.scala => MetadataLogFileCatalog.scala} (89%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6114142cefea2..618ea3d669bd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -136,7 +136,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None) + val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) format.inferSchema( sparkSession, caseInsensitiveOptions, @@ -258,7 +258,7 @@ case class DataSource( case (format: FileFormat, _) if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new StreamFileCatalog(sparkSession, basePath) + val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -310,8 +310,8 @@ case class DataSource( }) } - val fileCatalog: FileCatalog = - new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema) + val fileCatalog = + new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema) val dataSchema = userSpecifiedSchema.map { schema => val equality = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala new file mode 100644 index 0000000000000..bdf43e02f4a07 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable +import scala.util.Try + +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + + +/** + * A [[FileCatalog]] that generates the list of files to process by recursively listing all the + * files present in `paths`. + * + * @param parameters as set of options to control discovery + * @param paths a list of paths to scan + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions + */ +class ListingFileCatalog( + sparkSession: SparkSession, + override val paths: Seq[Path], + parameters: Map[String, String], + partitionSchema: Option[StructType]) + extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { + + @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ + @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ + @volatile private var cachedPartitionSpec: PartitionSpec = _ + + refresh() + + override def partitionSpec(): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning() + } + cachedPartitionSpec + } + + override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + cachedLeafFiles + } + + override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + cachedLeafDirToChildrenFiles + } + + override def refresh(): Unit = { + val files = listLeafFiles(paths) + cachedLeafFiles = + new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) + cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) + cachedPartitionSpec = null + } + + protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) + } else { + val statuses: Seq[FileStatus] = paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + logInfo(s"Listing $path on driver") + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + + val statuses = { + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + } + + statuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a + // a big deal since we always use to `listLeafFilesInParallel` when the number of paths + // exceeds threshold. + case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + } + }.filterNot { status => + val name = status.getPath.getName + HadoopFsRelation.shouldFilterOut(name) + } + + val (dirs, files) = statuses.partition(_.isDirectory) + + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) + if (dirs.isEmpty) { + mutable.LinkedHashSet(files: _*) + } else { + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) + } + } + } + + override def equals(other: Any): Boolean = other match { + case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet + case _ => false + } + + override def hashCode(): Int = paths.toSet.hashCode() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala new file mode 100644 index 0000000000000..9d997d628579c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.{StringType, StructType} + + +/** + * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. + * It provides the necessary methods to parse partition data based on a set of files. + * + * @param parameters as set of options to control partition discovery + * @param partitionSchema an optional partition schema that will be use to provide types for the + * discovered partitions +*/ +abstract class PartitioningAwareFileCatalog( + sparkSession: SparkSession, + parameters: Map[String, String], + partitionSchema: Option[StructType]) + extends FileCatalog with Logging { + + protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + + protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] + + protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] + + override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + if (partitionSpec().partitionColumns.isEmpty) { + Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + } else { + prunePartitions(filters, partitionSpec()).map { + case PartitionDirectory(values, path) => + Partition( + values, + leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_")) + } + } + } + + override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq + + protected def inferPartitioning(): PartitionSpec = { + // We use leaf dirs containing data files to discover the schema. + val leafDirs = leafDirToChildrenFiles.keys.toSeq + partitionSchema match { + case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => + val spec = PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = false, + basePaths = basePaths) + + // Without auto inference, all of value in the `row` should be null or in StringType, + // we need to cast into the data type that user specified. + def castPartitionValuesToUserSchema(row: InternalRow) = { + InternalRow((0 until row.numFields).map { i => + Cast( + Literal.create(row.getUTF8String(i), StringType), + userProvidedSchema.fields(i).dataType).eval() + }: _*) + } + + PartitionSpec(userProvidedSchema, spec.partitions.map { part => + part.copy(values = castPartitionValuesToUserSchema(part.values)) + }) + case _ => + PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(), + basePaths = basePaths) + } + } + + private def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + val selected = partitions.filter { + case PartitionDirectory(values, _) => boundPredicate(values) + } + logInfo { + val total = partitions.length + val selectedSize = selected.length + val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 + s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." + } + + selected + } else { + partitions + } + } + + /** + * Contains a set of paths that are considered as the base dirs of the input datasets. + * The partitioning discovery logic will make sure it will stop when it reaches any + * base path. By default, the paths of the dataset provided by users will be base paths. + * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path + * will be `/path/something=true/`, and the returned DataFrame will not contain a column of + * `something`. If users want to override the basePath. They can set `basePath` in the options + * to pass the new base path to the data source. + * For the above example, if the user-provided base path is `/path/`, the returned + * DataFrame will have the column of `something`. + */ + private def basePaths: Set[Path] = { + val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) + userDefinedBasePath.getOrElse { + // If the user does not provide basePath, we will just use paths. + paths.toSet + }.map { hdfsPath => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = hdfsPath.getFileSystem(hadoopConf) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 8b0ba14ffbc46..3d375f23191c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -264,257 +264,31 @@ case class Partition(values: InternalRow, files: Seq[FileStatus]) * as the partitioning characteristics of those files. */ trait FileCatalog { + + /** Returns the list of input paths from which the catalog will get files. */ def paths: Seq[Path] + /** Returns the specification of the partitions inferred from the data. */ def partitionSpec(): PartitionSpec /** * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with not partition values. + * unpartitioned, this will return a single partition with no partition values. * - * @param filters the filters used to prune which partitions are returned. These filters must + * @param filters The filters used to prune which partitions are returned. These filters must * only refer to partition columns and this method will only return files * where these predicates are guaranteed to evaluate to `true`. Thus, these * filters will not need to be evaluated again on the returned data. */ def listFiles(filters: Seq[Expression]): Seq[Partition] + /** Returns all the valid files. */ def allFiles(): Seq[FileStatus] - def getStatus(path: Path): Array[FileStatus] - + /** Refresh the file listing */ def refresh(): Unit } -/** - * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. - * It provides the necessary methods to parse partition data based on a set of files. - * - * @param parameters as set of options to control partition discovery - * @param partitionSchema an optional partition schema that will be use to provide types for the - * discovered partitions -*/ -abstract class PartitioningAwareFileCatalog( - sparkSession: SparkSession, - parameters: Map[String, String], - partitionSchema: Option[StructType]) - extends FileCatalog with Logging { - - protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) - - protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] - - protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - - override def listFiles(filters: Seq[Expression]): Seq[Partition] = { - if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil - } else { - prunePartitions(filters, partitionSpec()).map { - case PartitionDirectory(values, path) => - Partition( - values, - getStatus(path).filterNot(_.getPath.getName startsWith "_")) - } - } - } - - override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq - - override def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) - - protected def inferPartitioning(): PartitionSpec = { - // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.keys.toSeq - partitionSchema match { - case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => - val spec = PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = false, - basePaths = basePaths) - - // Without auto inference, all of value in the `row` should be null or in StringType, - // we need to cast into the data type that user specified. - def castPartitionValuesToUserSchema(row: InternalRow) = { - InternalRow((0 until row.numFields).map { i => - Cast( - Literal.create(row.getUTF8String(i), StringType), - userProvidedSchema.fields(i).dataType).eval() - }: _*) - } - - PartitionSpec(userProvidedSchema, spec.partitions.map { part => - part.copy(values = castPartitionValuesToUserSchema(part.values)) - }) - case _ => - PartitioningUtils.parsePartitions( - leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(), - basePaths = basePaths) - } - } - - private def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = partitionPruningPredicates.reduce(expressions.And) - - val boundPredicate = InterpretedPredicate.create(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - val selected = partitions.filter { - case PartitionDirectory(values, _) => boundPredicate(values) - } - logInfo { - val total = partitions.length - val selectedSize = selected.length - val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 - s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." - } - - selected - } else { - partitions - } - } - - /** - * Contains a set of paths that are considered as the base dirs of the input datasets. - * The partitioning discovery logic will make sure it will stop when it reaches any - * base path. By default, the paths of the dataset provided by users will be base paths. - * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path - * will be `/path/something=true/`, and the returned DataFrame will not contain a column of - * `something`. If users want to override the basePath. They can set `basePath` in the options - * to pass the new base path to the data source. - * For the above example, if the user-provided base path is `/path/`, the returned - * DataFrame will have the column of `something`. - */ - private def basePaths: Set[Path] = { - val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) - userDefinedBasePath.getOrElse { - // If the user does not provide basePath, we will just use paths. - paths.toSet - }.map { hdfsPath => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = hdfsPath.getFileSystem(hadoopConf) - hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - } -} - - -/** - * A file catalog that caches metadata gathered by scanning all the files present in `paths` - * recursively. - * - * @param parameters as set of options to control discovery - * @param paths a list of paths to scan - * @param partitionSchema an optional partition schema that will be use to provide types for the - * discovered partitions - */ -class HDFSFileCatalog( - sparkSession: SparkSession, - parameters: Map[String, String], - override val paths: Seq[Path], - partitionSchema: Option[StructType]) - extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { - - @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ - @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ - @volatile private var cachedPartitionSpec: PartitionSpec = _ - - refresh() - - override def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - cachedPartitionSpec = inferPartitioning() - } - cachedPartitionSpec - } - - override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { - cachedLeafFiles - } - - override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { - cachedLeafDirToChildrenFiles - } - - override def refresh(): Unit = { - val files = listLeafFiles(paths) - cachedLeafFiles = - new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) - cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) - cachedPartitionSpec = null - } - - - protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) - } else { - val statuses: Seq[FileStatus] = paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - logInfo(s"Listing $path on driver") - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - - val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats - } - - statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) - } - }.filterNot { status => - val name = status.getPath.getName - HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { - mutable.LinkedHashSet(files: _*) - } else { - mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) - } - } - } - - - override def equals(other: Any): Boolean = other match { - case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet - case _ => false - } - - override def hashCode(): Int = paths.toSet.hashCode() -} /** * Helper methods for gathering metadata from HDFS. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala similarity index 89% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala index 30181b275186d..20ade12e3796a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -25,7 +25,11 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ -class StreamFileCatalog(sparkSession: SparkSession, path: Path) +/** + * A [[FileCatalog]] that generates the list of files to processing by reading them from the + * metadata log files generated by the [[FileStreamSink]]. + */ +class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) { private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0b24d358548b0..7a799b6c87e9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -536,17 +536,12 @@ private[hive] class MetaStoreFileCatalog( sparkSession: SparkSession, paths: Seq[Path], partitionSpecFromHive: PartitionSpec) - extends HDFSFileCatalog( + extends ListingFileCatalog( sparkSession, - Map.empty, paths, + Map.empty, Some(partitionSpecFromHive.partitionColumns)) { - override def getStatus(path: Path): Array[FileStatus] = { - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.listStatus(path) - } - override def partitionSpec(): PartitionSpec = partitionSpecFromHive } From 84864e8995cac9636d7447dac3880bb18a3e7b49 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 19:24:27 -0700 Subject: [PATCH 5/6] Added unit test to verify pruning --- .../sql/streaming/FileStreamSinkSuite.scala | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index c8a12559f89d0..2b6860322dd93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -22,12 +22,14 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter} -import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.execution.datasources.parquet +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.datasources.{parquet, FilePartition, FileScanRDD} import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest with SharedSQLContext { @@ -157,7 +159,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { var query: ContinuousQuery = null try { - query = + query = ds.map(i => (i, i * 1000)) .toDF("id", "value") .write @@ -172,10 +174,46 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { } val outputDf = sqlContext.read.parquet(outputDir) + val expectedSchema = new StructType() + .add(StructField("value", IntegerType)) + .add(StructField("id", IntegerType)) + assert(outputDf.schema === expectedSchema) + checkDataset( outputDf.as[(Int, Int)], (1000, 1), (2000, 2), (3000, 3)) + + /** Check some condition on the partitions of the FileScanRDD generated by a DF */ + def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { + val getFileScanRDD = df.queryExecution.executedPlan.collect { + case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] + }.headOption.getOrElse { + fail(s"No FileScan in query\n${df.queryExecution}") + } + func(getFileScanRDD.filePartitions) + } + + // Read without pruning + checkFileScanPartitions(outputDf) { partitions => + // There should be as many distinct partition values as there are distinct ids + assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3) + } + + // Read with pruning, should read only files in partition dir id=1 + checkFileScanPartitions(outputDf.filter("id = 1")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) + } + + // Read with pruning, should read only files in partition dir id=1 and id=2 + checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions => + val filesToBeRead = partitions.flatMap(_.files) + assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) + assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) + } } finally { if (query != null) { query.stop() From 9e62ccfb48aa2797014d278050c3bbe4fe4e7a04 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 19:47:56 -0700 Subject: [PATCH 6/6] More tests --- .../sql/streaming/FileStreamSinkSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 2b6860322dd93..e937fc3e876e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -25,11 +25,11 @@ import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{parquet, FilePartition, FileScanRDD} -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest with SharedSQLContext { @@ -179,11 +179,22 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { .add(StructField("id", IntegerType)) assert(outputDf.schema === expectedSchema) + // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has + // been inferred + val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { + case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] => + baseRelation.asInstanceOf[HadoopFsRelation] + } + assert(hadoopdFsRelations.size === 1) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog]) + assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) + assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) + + // Verify the data is correctly read checkDataset( outputDf.as[(Int, Int)], (1000, 1), (2000, 2), (3000, 3)) - /** Check some condition on the partitions of the FileScanRDD generated by a DF */ def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { val getFileScanRDD = df.queryExecution.executedPlan.collect {