From 32e5f0d78e2e780fb2ac2205170e019cc08e5eda Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 1 Jul 2015 16:32:44 -0700 Subject: [PATCH 1/5] Moves schema merging to executor side Removes some dead code Parallelizes input paths listing --- .../apache/spark/deploy/SparkHadoopUtil.scala | 8 + .../apache/spark/sql/DataFrameReader.scala | 12 +- .../scala/org/apache/spark/sql/SQLConf.scala | 10 +- .../sql/parquet/ParquetTableOperations.scala | 14 +- .../apache/spark/sql/parquet/newParquet.scala | 158 +++++++++++++----- .../org/apache/spark/sql/sources/ddl.scala | 8 +- .../apache/spark/sql/sources/interfaces.scala | 122 +++++++++++--- 7 files changed, 243 insertions(+), 89 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9f94118829ff..85aa8fa6eede 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -238,6 +238,14 @@ class SparkHadoopUtil extends Logging { }.getOrElse(Seq.empty[Path]) } + def globPathIfNecessary(pattern: Path): Seq[Path] = { + if (pattern.toString.exists("{}[]*?\\".toSet.contains)) { + globPath(pattern) + } else { + Seq(pattern) + } + } + /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9ad6e21da7bf..9d0aa1052b66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.util.Properties import org.apache.hadoop.fs.Path -import org.apache.spark.Partition +import org.apache.spark.{Logging, Partition} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType * @since 1.4.0 */ @Experimental -class DataFrameReader private[sql](sqlContext: SQLContext) { +class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { /** * Specifies the input data source format. @@ -260,7 +260,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { if (paths.isEmpty) { sqlContext.emptyDataFrame } else { - val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray + val globbedPaths = paths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + sqlContext.baseRelationToDataFrame( new ParquetRelation2( globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6005d35f015a..1af278ac6f26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -228,7 +228,7 @@ private[spark] object SQLConf { doc = "") val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", - defaultValue = Some(true), + defaultValue = Some(false), doc = "When true, the Parquet data source merges schemas collected from all data files, " + "otherwise the schema is picked from the summary file or a random data file " + "if no summary file is available.") @@ -361,6 +361,11 @@ private[spark] object SQLConf { val OUTPUT_COMMITTER_CLASS = stringConf("spark.sql.sources.outputCommitterClass", isPublic = false) + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf( + key = "spark.sql.sources.parallelPartitionDiscovery.threshold", + defaultValue = Some(32), + doc = "") + // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis", @@ -538,6 +543,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def partitionColumnTypeInferenceEnabled(): Boolean = getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + private[spark] def parallelPartitionDiscoveryThreshold: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + // Do not use a value larger than 4000 as the default value of this property. // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 9058b0937529..28cba5e54d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -426,6 +426,7 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) } } +// TODO Removes this class after removing old Parquet support code /** * We extend ParquetInputFormat in order to have more control over which * RecordFilter we want to use. @@ -433,8 +434,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) private[parquet] class FilteringParquetRowInputFormat extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging { - private var fileStatuses = Map.empty[Path, FileStatus] - override def createRecordReader( inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = { @@ -455,17 +454,6 @@ private[parquet] class FilteringParquetRowInputFormat } -private[parquet] object FilteringParquetRowInputFormat { - private val footerCache = CacheBuilder.newBuilder() - .maximumSize(20000) - .build[FileStatus, Footer]() - - private val blockLocationCache = CacheBuilder.newBuilder() - .maximumSize(20000) - .expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move - .build[FileStatus, Array[BlockLocation]]() -} - private[parquet] object FileSystemHelper { def listFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 01dd6f471bd7..bbdeb12c9598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,7 +22,7 @@ import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.util.Try +import scala.util.{Failure, Try} import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} @@ -31,12 +31,11 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ -import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName} +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD._ import org.apache.spark.sql._ @@ -278,19 +277,13 @@ private[sql] class ParquetRelation2( // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ - val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) - Utils.withDummyCallSite(sqlContext.sparkContext) { - // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. - // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects - // and footers. Especially when a global arbitrative schema (either from metastore or data - // source DDL) is available. new SqlNewHadoopRDD( sc = sqlContext.sparkContext, broadcastedConf = broadcastedConf, initDriverSideJobFuncOpt = Some(setInputPaths), initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[FilteringParquetRowInputFormat], + inputFormatClass = classOf[ParquetInputFormat[InternalRow]], keyClass = classOf[Void], valueClass = classOf[InternalRow]) { @@ -306,12 +299,6 @@ private[sql] class ParquetRelation2( f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) }.toSeq - @transient val cachedFooters = footers.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) - }.toSeq - private def escapePathUserInfo(path: Path): Path = { val uri = path.toUri new Path(new URI( @@ -321,13 +308,10 @@ private[sql] class ParquetRelation2( // Overridden so we can inject our own cached files statuses. override def getPartitions: Array[SparkPartition] = { - val inputFormat = if (cacheMetadata) { - new FilteringParquetRowInputFormat { - override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses - override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + val inputFormat = new ParquetInputFormat[Row] { + override def listStatus(jobContext: JobContext): JList[FileStatus] = { + if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) } - } else { - new FilteringParquetRowInputFormat } val jobContext = newJobContext(getConf(isDriverSide = true), jobId) @@ -348,9 +332,6 @@ private[sql] class ParquetRelation2( // `FileStatus` objects of all "_common_metadata" files. private var commonMetadataStatuses: Array[FileStatus] = _ - // Parquet footer cache. - var footers: Map[Path, Footer] = _ - // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -376,20 +357,6 @@ private[sql] class ParquetRelation2( commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = { - val conf = SparkHadoopUtil.get.conf - val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) - val rawFooters = if (shouldMergeSchemas) { - ParquetFileReader.readAllFootersInParallel( - conf, seqAsJavaList(leaves), taskSideMetaData) - } else { - ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( - conf, seqAsJavaList(leaves), taskSideMetaData) - } - - rawFooters.map(footer => footer.getFile -> footer).toMap - } - // If we already get the schema, don't need to re-compute it since the schema merging is // time-consuming. if (dataSchema == null) { @@ -422,7 +389,7 @@ private[sql] class ParquetRelation2( // Always tries the summary files first if users don't require a merged schema. In this case, // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row // groups information, and could be much smaller for large Parquet files with lots of row - // groups. + // groups. If no summary file is available, falls back to some random part-file. // // NOTE: Metadata stored in the summary files are merged from all part-files. However, for // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know @@ -457,10 +424,10 @@ private[sql] class ParquetRelation2( assert( filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined, - "No schema defined, " + - s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") + "No predefined schema found, " + + s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.") - ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) + ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext) } } } @@ -519,6 +486,7 @@ private[sql] object ParquetRelation2 extends Logging { private[parquet] def initializeDriverSideJobFunc( inputFiles: Array[FileStatus])(job: Job): Unit = { // We side the input paths at the driver side. + logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}") if (inputFiles.nonEmpty) { FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) } @@ -543,7 +511,7 @@ private[sql] object ParquetRelation2 extends Logging { .getKeyValueMetaData .toMap .get(RowReadSupport.SPARK_METADATA_KEY) - if (serializedSchema == None) { + if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. Some(parseParquetSchema(metadata.getSchema)) } else if (!seen.contains(serializedSchema.get)) { @@ -646,4 +614,106 @@ private[sql] object ParquetRelation2 extends Logging { .filter(_.nullable) StructType(parquetSchema ++ missingFields) } + + /** + * Figures out a merged Parquet schema with a distributed Spark job. + * + * Note that locality is not taken into consideration here because: + * + * 1. For a single Parquet part-file, in most cases the footer only resides in the last block of + * that file. Thus we only need to retrieve the location of the last block. However, Hadoop + * `FileSystem` only provides API to retrieve locations of all blocks, which can be + * potentially expensive. + * + * 2. This optimization is mainly useful for S3, where file metadata operations can be pretty + * slow. And basically locality is not available when using S3 (you can't run computation on + * S3 nodes). + */ + def mergeSchemasInParallel( + filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = { + val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString + val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp + val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec + val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration) + + // HACK ALERT: + // + // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es + // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` + // but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well + // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These + // facts virtually prevents us to serialize `FileStatus`es. + // + // Since Parquet only relies on path and length information of those `FileStatus`es to read + // footers, here we just extract them (which can be easily serialized), send them to executor + // side, and resemble fake `FileStatus`es there. + val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) + + // Issues a Spark job to read Parquet schema in parallel. + val partiallyMergedSchemas = + sqlContext + .sparkContext + .parallelize(partialFileStatusInfo) + .mapPartitions { iterator => + // Resembles fake `FileStatus`es with serialized path and length information. + val fakeFileStatuses = iterator.map { case (path, length) => + new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) + }.toSeq + + // Skips row group information since we only need the schema + val skipRowGroups = true + + // Reads footers in multi-threaded manner within each task + val footers = + ParquetFileReader.readAllFootersInParallel( + serializedConf.value, fakeFileStatuses, skipRowGroups) + + // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` + val converter = + new CatalystSchemaConverter( + assumeBinaryIsString = assumeBinaryIsString, + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + followParquetFormatSpec = followParquetFormatSpec) + + footers.map { footer => + ParquetRelation2.readSchemaFromFooter(footer, converter) + }.reduceOption(_ merge _).iterator + }.collect() + + partiallyMergedSchemas.reduceOption(_ merge _) + } + + /** + * Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string + * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns + * a [[StructType]] converted from the [[MessageType]] stored in this footer. + */ + def readSchemaFromFooter( + footer: Footer, converter: CatalystSchemaConverter): StructType = { + val fileMetaData = footer.getParquetMetadata.getFileMetaData + fileMetaData + .getKeyValueMetaData + .toMap + .get(RowReadSupport.SPARK_METADATA_KEY) + .flatMap(deserializeSchemaString) + .getOrElse(converter.convert(fileMetaData.getSchema)) + } + + private def deserializeSchemaString(schemaString: String): Option[StructType] = { + // Tries to deserialize the schema string as JSON first, then falls back to the case class + // string parser (data generated by older versions of Spark SQL uses this format). + Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { + case _: Throwable => + logInfo( + s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "falling back to the deprecated DataType.fromCaseClassString parser.") + DataType.fromCaseClassString(schemaString).asInstanceOf[StructType] + }.recoverWith { + case cause: Throwable => + logWarning( + "Failed to parse and ignored serialized Spark schema in " + + s"Parquet key-value metadata:\n\t$schemaString", cause) + Failure(cause) + }.toOption + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index d7440c55bd4a..5a8c97c773ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -247,7 +247,9 @@ private[sql] object ResolvedDataSource { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { val patternPath = new Path(caseInsensitiveOptions("path")) - SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray + val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray } val dataSchema = @@ -272,7 +274,9 @@ private[sql] object ResolvedDataSource { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { val patternPath = new Path(caseInsensitiveOptions("path")) - SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray + val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray } dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions) case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index b13c5313b25c..1c0019db7137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,21 +18,23 @@ package org.apache.spark.sql.sources import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.RDDConversions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection +import org.apache.spark.sql.execution.RDDConversions import org.apache.spark.sql.types.StructType +import org.apache.spark.sql._ import org.apache.spark.util.SerializableConfiguration /** @@ -367,7 +369,9 @@ abstract class OutputWriter { */ @Experimental abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) - extends BaseRelation { + extends BaseRelation with Logging { + + logInfo("Constructing HadoopFsRelation") def this() = this(None) @@ -382,36 +386,41 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - def refresh(): Unit = { - // We don't filter files/directories whose name start with "_" except "_temporary" here, as - // specific data sources may take advantages over them (e.g. Parquet _metadata and - // _common_metadata files). "_temporary" directories are explicitly ignored since failed - // tasks/jobs may leave partial/corrupted data files there. - def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { - if (status.getPath.getName.toLowerCase == "_temporary") { - Set.empty + private def listLeafFilesAndDirs(paths: Array[String]): Set[FileStatus] = { + if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { + HadoopFsRelation.listLeafFilesAndDirsInParallel(paths, hadoopConf, sqlContext.sparkContext) + } else { + val statuses = paths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + + logInfo(s"Listing $qualified on driver") + Try(fs.listStatus(qualified)).getOrElse(Array.empty) + }.filterNot { status => + val name = status.getPath.getName + name.toLowerCase == "_temporary" || name.startsWith(".") + } + + val (dirs, files) = statuses.partition(_.isDir) + + if (dirs.isEmpty) { + files.toSet } else { - val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) - val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus] - files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + files.toSet ++ listLeafFilesAndDirs(dirs.map(_.getPath.toString)) } } + } - leafFiles.clear() + def refresh(): Unit = { + assert(paths.nonEmpty, "Input paths can't be empty") + val files = listLeafFilesAndDirs(paths) - val statuses = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) - }.filterNot { status => - // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories - status.getPath.getName.startsWith(".") - } + leafFiles.clear() + leafDirToChildrenFiles.clear() - val files = statuses.filterNot(_.isDir) leafFiles ++= files.map(f => f.getPath -> f).toMap - leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent) + leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) } } @@ -666,3 +675,64 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio */ def prepareJobForWrite(job: Job): OutputWriterFactory } + +private[sql] object HadoopFsRelation extends Logging { + // We don't filter files/directories whose name start with "_" except "_temporary" here, as + // specific data sources may take advantages over them (e.g. Parquet _metadata and + // _common_metadata files). "_temporary" directories are explicitly ignored since failed + // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name + // start with "." are also ignored. + def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Array[FileStatus] = { + logInfo(s"Listing ${status.getPath}") + val name = status.getPath.getName.toLowerCase + if (name == "_temporary" || name.startsWith(".")) { + Array.empty + } else { + val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) + val leafDirs = if (dirs.isEmpty) Array(status) else Array.empty[FileStatus] + files ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + } + } + + // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play + // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`. + // Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from + // executor side and reconstruct it on driver side. + case class FakeFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long) + + def listLeafFilesAndDirsInParallel( + paths: Array[String], + hadoopConf: Configuration, + sparkContext: SparkContext): Set[FileStatus] = { + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val fakeStatuses = sparkContext.parallelize(paths).flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(serializableConfiguration.value) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + Try(listLeafFilesAndDirs(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty) + }.map { status => + FakeFileStatus( + status.getPath.toString, + status.getLen, + status.isDir, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime) + }.collect() + + fakeStatuses.map { f => + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)) + }.toSet + } +} From b1646aa87ec41c381c87023ec50ff22c1ec8998c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 14 Jul 2015 23:29:50 +0800 Subject: [PATCH 2/5] Should allow empty input paths --- .../src/main/scala/org/apache/spark/sql/sources/interfaces.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 1c0019db7137..a4dcdeef3c9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -413,7 +413,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } def refresh(): Unit = { - assert(paths.nonEmpty, "Input paths can't be empty") val files = listLeafFilesAndDirs(paths) leafFiles.clear() From 3c580f16f7356a05588df3f98867ee2101428b18 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 15 Jul 2015 07:02:01 +0800 Subject: [PATCH 3/5] Fixes test failure caused by making "mergeSchema" default to "false" --- .../spark/sql/parquet/ParquetPartitionDiscoverySuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index d0ebb11b063f..9206a6b95275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -447,7 +447,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), makePartitionDir(base, defaultPartitionName, "pi" -> 2)) - sqlContext.read.format("parquet").load(base.getCanonicalPath).registerTempTable("t") + sqlContext + .read + .option("mergeSchema", "true") + .format("parquet") + .load(base.getCanonicalPath) + .registerTempTable("t") withTempTable("t") { checkAnswer( From ff32cd0ebd58e471b6f5111aee4f1847d06f36ba Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 15 Jul 2015 12:17:25 +0800 Subject: [PATCH 4/5] Excludes directories while listing leaf files --- .../apache/spark/sql/sources/interfaces.scala | 17 ++++++++--------- .../ParquetPartitionDiscoverySuite.scala | 11 +++++++++++ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index a4dcdeef3c9d..31980fa0f3cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -386,9 +386,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - private def listLeafFilesAndDirs(paths: Array[String]): Set[FileStatus] = { + private def listLeafFiles(paths: Array[String]): Set[FileStatus] = { if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesAndDirsInParallel(paths, hadoopConf, sqlContext.sparkContext) + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) } else { val statuses = paths.flatMap { path => val hdfsPath = new Path(path) @@ -407,13 +407,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio if (dirs.isEmpty) { files.toSet } else { - files.toSet ++ listLeafFilesAndDirs(dirs.map(_.getPath.toString)) + files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString)) } } } def refresh(): Unit = { - val files = listLeafFilesAndDirs(paths) + val files = listLeafFiles(paths) leafFiles.clear() leafDirToChildrenFiles.clear() @@ -681,15 +681,14 @@ private[sql] object HadoopFsRelation extends Logging { // _common_metadata files). "_temporary" directories are explicitly ignored since failed // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name // start with "." are also ignored. - def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Array[FileStatus] = { + def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = { logInfo(s"Listing ${status.getPath}") val name = status.getPath.getName.toLowerCase if (name == "_temporary" || name.startsWith(".")) { Array.empty } else { val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) - val leafDirs = if (dirs.isEmpty) Array(status) else Array.empty[FileStatus] - files ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) } } @@ -706,7 +705,7 @@ private[sql] object HadoopFsRelation extends Logging { modificationTime: Long, accessTime: Long) - def listLeafFilesAndDirsInParallel( + def listLeafFilesInParallel( paths: Array[String], hadoopConf: Configuration, sparkContext: SparkContext): Set[FileStatus] = { @@ -717,7 +716,7 @@ private[sql] object HadoopFsRelation extends Logging { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(serializableConfiguration.value) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(listLeafFilesAndDirs(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty) + Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty) }.map { status => FakeFileStatus( status.getPath.toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 9206a6b95275..37b0a9fbf7a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -588,4 +588,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { Seq("a", "a, b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) } + + test("Parallel partition discovery") { + withTempPath { dir => + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") { + val path = dir.getCanonicalPath + val df = sqlContext.range(5).select('id as 'a, 'id as 'b, 'id as 'c).coalesce(1) + df.write.partitionBy("b", "c").parquet(path) + checkAnswer(sqlContext.read.parquet(path), df) + } + } + } } From 5598efc9707657fb31da71b1999d07164e26e218 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 16 Jul 2015 18:39:27 +0800 Subject: [PATCH 5/5] Uses ParquetInputFormat[InternalRow] instead of ParquetInputFormat[Row] --- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bbdeb12c9598..e683eb012600 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -308,7 +308,7 @@ private[sql] class ParquetRelation2( // Overridden so we can inject our own cached files statuses. override def getPartitions: Array[SparkPartition] = { - val inputFormat = new ParquetInputFormat[Row] { + val inputFormat = new ParquetInputFormat[InternalRow] { override def listStatus(jobContext: JobContext): JList[FileStatus] = { if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) }