Skip to content

Commit a1064df

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-8125] [SQL] Accelerates Parquet schema merging and partition discovery
This PR tries to accelerate Parquet schema discovery and `HadoopFsRelation` partition discovery. The acceleration is done by the following means: - Turning off schema merging by default Schema merging is not the most common case, but requires reading footers of all Parquet part-files and can be very slow. - Avoiding `FileSystem.globStatus()` call when possible `FileSystem.globStatus()` may issue multiple synchronous RPC calls, and can be very slow (esp. on S3). This PR adds `SparkHadoopUtil.globPathIfNecessary()`, which only issues RPC calls when the path contain glob-pattern specific character(s) (`{}[]*?\`). This is especially useful when converting a metastore Parquet table with lots of partitions, since Spark SQL adds all partition directories as the input paths, and currently we do a `globStatus` call on each input path sequentially. - Listing leaf files in parallel when the number of input paths exceeds a threshold Listing leaf files is required by partition discovery. Currently it is done on driver side, and can be slow when there are lots of (nested) directories, since each `FileSystem.listStatus()` call issues an RPC. In this PR, we list leaf files in a BFS style, and resort to a Spark job once we found that the number of directories need to be listed exceed a threshold. The threshold is controlled by `SQLConf` option `spark.sql.sources.parallelPartitionDiscovery.threshold`, which defaults to 32. - Discovering Parquet schema in parallel Currently, schema merging is also done on driver side, and needs to read footers of all part-files. This PR uses a Spark job to do schema merging. Together with task side metadata reading in Parquet 1.7.0, we never read any footers on driver side now. Author: Cheng Lian <[email protected]> Closes #7396 from liancheng/accel-parquet and squashes the following commits: 5598efc [Cheng Lian] Uses ParquetInputFormat[InternalRow] instead of ParquetInputFormat[Row] ff32cd0 [Cheng Lian] Excludes directories while listing leaf files 3c580f1 [Cheng Lian] Fixes test failure caused by making "mergeSchema" default to "false" b1646aa [Cheng Lian] Should allow empty input paths 32e5f0d [Cheng Lian] Moves schema merging to executor side
1 parent dac7dbf commit a1064df

File tree

8 files changed

+258
-90
lines changed

8 files changed

+258
-90
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,14 @@ class SparkHadoopUtil extends Logging {
239239
}.getOrElse(Seq.empty[Path])
240240
}
241241

242+
def globPathIfNecessary(pattern: Path): Seq[Path] = {
243+
if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
244+
globPath(pattern)
245+
} else {
246+
Seq(pattern)
247+
}
248+
}
249+
242250
/**
243251
* Lists all the files in a directory with the specified prefix, and does not end with the
244252
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql
2020
import java.util.Properties
2121

2222
import org.apache.hadoop.fs.Path
23-
import org.apache.spark.Partition
23+
import org.apache.spark.{Logging, Partition}
2424

2525
import org.apache.spark.annotation.Experimental
2626
import org.apache.spark.api.java.JavaRDD
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType
4040
* @since 1.4.0
4141
*/
4242
@Experimental
43-
class DataFrameReader private[sql](sqlContext: SQLContext) {
43+
class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
4444

4545
/**
4646
* Specifies the input data source format.
@@ -251,7 +251,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
251251
if (paths.isEmpty) {
252252
sqlContext.emptyDataFrame
253253
} else {
254-
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
254+
val globbedPaths = paths.flatMap { path =>
255+
val hdfsPath = new Path(path)
256+
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
257+
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
258+
SparkHadoopUtil.get.globPathIfNecessary(qualified)
259+
}.toArray
260+
255261
sqlContext.baseRelationToDataFrame(
256262
new ParquetRelation2(
257263
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ private[spark] object SQLConf {
242242
doc = "Whether the query analyzer should be case sensitive or not.")
243243

244244
val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
245-
defaultValue = Some(true),
245+
defaultValue = Some(false),
246246
doc = "When true, the Parquet data source merges schemas collected from all data files, " +
247247
"otherwise the schema is picked from the summary file or a random data file " +
248248
"if no summary file is available.")
@@ -376,6 +376,11 @@ private[spark] object SQLConf {
376376
val OUTPUT_COMMITTER_CLASS =
377377
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
378378

379+
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
380+
key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
381+
defaultValue = Some(32),
382+
doc = "<TODO>")
383+
379384
// Whether to perform eager analysis when constructing a dataframe.
380385
// Set to false when debugging requires the ability to look at invalid query plans.
381386
val DATAFRAME_EAGER_ANALYSIS = booleanConf(
@@ -495,6 +500,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
495500
private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
496501
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
497502

503+
private[spark] def parallelPartitionDiscoveryThreshold: Int =
504+
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
505+
498506
// Do not use a value larger than 4000 as the default value of this property.
499507
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
500508
private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -426,15 +426,14 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
426426
}
427427
}
428428

429+
// TODO Removes this class after removing old Parquet support code
429430
/**
430431
* We extend ParquetInputFormat in order to have more control over which
431432
* RecordFilter we want to use.
432433
*/
433434
private[parquet] class FilteringParquetRowInputFormat
434435
extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging {
435436

436-
private var fileStatuses = Map.empty[Path, FileStatus]
437-
438437
override def createRecordReader(
439438
inputSplit: InputSplit,
440439
taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
@@ -455,17 +454,6 @@ private[parquet] class FilteringParquetRowInputFormat
455454

456455
}
457456

458-
private[parquet] object FilteringParquetRowInputFormat {
459-
private val footerCache = CacheBuilder.newBuilder()
460-
.maximumSize(20000)
461-
.build[FileStatus, Footer]()
462-
463-
private val blockLocationCache = CacheBuilder.newBuilder()
464-
.maximumSize(20000)
465-
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
466-
.build[FileStatus, Array[BlockLocation]]()
467-
}
468-
469457
private[parquet] object FileSystemHelper {
470458
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
471459
val origPath = new Path(pathStr)

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 114 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.{List => JList}
2222

2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable
25-
import scala.util.Try
25+
import scala.util.{Failure, Try}
2626

2727
import com.google.common.base.Objects
2828
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -31,12 +31,11 @@ import org.apache.hadoop.mapreduce._
3131
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3232
import org.apache.parquet.filter2.predicate.FilterApi
3333
import org.apache.parquet.hadoop._
34-
import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
34+
import org.apache.parquet.hadoop.metadata.CompressionCodecName
3535
import org.apache.parquet.hadoop.util.ContextUtil
3636
import org.apache.parquet.schema.MessageType
3737

3838
import org.apache.spark.broadcast.Broadcast
39-
import org.apache.spark.deploy.SparkHadoopUtil
4039
import org.apache.spark.rdd.RDD
4140
import org.apache.spark.rdd.RDD._
4241
import org.apache.spark.sql._
@@ -278,19 +277,13 @@ private[sql] class ParquetRelation2(
278277
// Create the function to set input paths at the driver side.
279278
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
280279

281-
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
282-
283280
Utils.withDummyCallSite(sqlContext.sparkContext) {
284-
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
285-
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
286-
// and footers. Especially when a global arbitrative schema (either from metastore or data
287-
// source DDL) is available.
288281
new SqlNewHadoopRDD(
289282
sc = sqlContext.sparkContext,
290283
broadcastedConf = broadcastedConf,
291284
initDriverSideJobFuncOpt = Some(setInputPaths),
292285
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
293-
inputFormatClass = classOf[FilteringParquetRowInputFormat],
286+
inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
294287
keyClass = classOf[Void],
295288
valueClass = classOf[InternalRow]) {
296289

@@ -306,12 +299,6 @@ private[sql] class ParquetRelation2(
306299
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
307300
}.toSeq
308301

309-
@transient val cachedFooters = footers.map { f =>
310-
// In order to encode the authority of a Path containing special characters such as /,
311-
// we need to use the string returned by the URI of the path to create a new Path.
312-
new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
313-
}.toSeq
314-
315302
private def escapePathUserInfo(path: Path): Path = {
316303
val uri = path.toUri
317304
new Path(new URI(
@@ -321,13 +308,10 @@ private[sql] class ParquetRelation2(
321308

322309
// Overridden so we can inject our own cached files statuses.
323310
override def getPartitions: Array[SparkPartition] = {
324-
val inputFormat = if (cacheMetadata) {
325-
new FilteringParquetRowInputFormat {
326-
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
327-
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
311+
val inputFormat = new ParquetInputFormat[InternalRow] {
312+
override def listStatus(jobContext: JobContext): JList[FileStatus] = {
313+
if (cacheMetadata) cachedStatuses else super.listStatus(jobContext)
328314
}
329-
} else {
330-
new FilteringParquetRowInputFormat
331315
}
332316

333317
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
@@ -348,9 +332,6 @@ private[sql] class ParquetRelation2(
348332
// `FileStatus` objects of all "_common_metadata" files.
349333
private var commonMetadataStatuses: Array[FileStatus] = _
350334

351-
// Parquet footer cache.
352-
var footers: Map[Path, Footer] = _
353-
354335
// `FileStatus` objects of all data files (Parquet part-files).
355336
var dataStatuses: Array[FileStatus] = _
356337

@@ -376,20 +357,6 @@ private[sql] class ParquetRelation2(
376357
commonMetadataStatuses =
377358
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
378359

379-
footers = {
380-
val conf = SparkHadoopUtil.get.conf
381-
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
382-
val rawFooters = if (shouldMergeSchemas) {
383-
ParquetFileReader.readAllFootersInParallel(
384-
conf, seqAsJavaList(leaves), taskSideMetaData)
385-
} else {
386-
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
387-
conf, seqAsJavaList(leaves), taskSideMetaData)
388-
}
389-
390-
rawFooters.map(footer => footer.getFile -> footer).toMap
391-
}
392-
393360
// If we already get the schema, don't need to re-compute it since the schema merging is
394361
// time-consuming.
395362
if (dataSchema == null) {
@@ -422,7 +389,7 @@ private[sql] class ParquetRelation2(
422389
// Always tries the summary files first if users don't require a merged schema. In this case,
423390
// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
424391
// groups information, and could be much smaller for large Parquet files with lots of row
425-
// groups.
392+
// groups. If no summary file is available, falls back to some random part-file.
426393
//
427394
// NOTE: Metadata stored in the summary files are merged from all part-files. However, for
428395
// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
@@ -457,10 +424,10 @@ private[sql] class ParquetRelation2(
457424

458425
assert(
459426
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
460-
"No schema defined, " +
461-
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
427+
"No predefined schema found, " +
428+
s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
462429

463-
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
430+
ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
464431
}
465432
}
466433
}
@@ -519,6 +486,7 @@ private[sql] object ParquetRelation2 extends Logging {
519486
private[parquet] def initializeDriverSideJobFunc(
520487
inputFiles: Array[FileStatus])(job: Job): Unit = {
521488
// We side the input paths at the driver side.
489+
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
522490
if (inputFiles.nonEmpty) {
523491
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
524492
}
@@ -543,7 +511,7 @@ private[sql] object ParquetRelation2 extends Logging {
543511
.getKeyValueMetaData
544512
.toMap
545513
.get(RowReadSupport.SPARK_METADATA_KEY)
546-
if (serializedSchema == None) {
514+
if (serializedSchema.isEmpty) {
547515
// Falls back to Parquet schema if no Spark SQL schema found.
548516
Some(parseParquetSchema(metadata.getSchema))
549517
} else if (!seen.contains(serializedSchema.get)) {
@@ -646,4 +614,106 @@ private[sql] object ParquetRelation2 extends Logging {
646614
.filter(_.nullable)
647615
StructType(parquetSchema ++ missingFields)
648616
}
617+
618+
/**
619+
* Figures out a merged Parquet schema with a distributed Spark job.
620+
*
621+
* Note that locality is not taken into consideration here because:
622+
*
623+
* 1. For a single Parquet part-file, in most cases the footer only resides in the last block of
624+
* that file. Thus we only need to retrieve the location of the last block. However, Hadoop
625+
* `FileSystem` only provides API to retrieve locations of all blocks, which can be
626+
* potentially expensive.
627+
*
628+
* 2. This optimization is mainly useful for S3, where file metadata operations can be pretty
629+
* slow. And basically locality is not available when using S3 (you can't run computation on
630+
* S3 nodes).
631+
*/
632+
def mergeSchemasInParallel(
633+
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
634+
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
635+
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
636+
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
637+
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
638+
639+
// HACK ALERT:
640+
//
641+
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
642+
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
643+
// but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well
644+
// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
645+
// facts virtually prevents us to serialize `FileStatus`es.
646+
//
647+
// Since Parquet only relies on path and length information of those `FileStatus`es to read
648+
// footers, here we just extract them (which can be easily serialized), send them to executor
649+
// side, and resemble fake `FileStatus`es there.
650+
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
651+
652+
// Issues a Spark job to read Parquet schema in parallel.
653+
val partiallyMergedSchemas =
654+
sqlContext
655+
.sparkContext
656+
.parallelize(partialFileStatusInfo)
657+
.mapPartitions { iterator =>
658+
// Resembles fake `FileStatus`es with serialized path and length information.
659+
val fakeFileStatuses = iterator.map { case (path, length) =>
660+
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
661+
}.toSeq
662+
663+
// Skips row group information since we only need the schema
664+
val skipRowGroups = true
665+
666+
// Reads footers in multi-threaded manner within each task
667+
val footers =
668+
ParquetFileReader.readAllFootersInParallel(
669+
serializedConf.value, fakeFileStatuses, skipRowGroups)
670+
671+
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
672+
val converter =
673+
new CatalystSchemaConverter(
674+
assumeBinaryIsString = assumeBinaryIsString,
675+
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
676+
followParquetFormatSpec = followParquetFormatSpec)
677+
678+
footers.map { footer =>
679+
ParquetRelation2.readSchemaFromFooter(footer, converter)
680+
}.reduceOption(_ merge _).iterator
681+
}.collect()
682+
683+
partiallyMergedSchemas.reduceOption(_ merge _)
684+
}
685+
686+
/**
687+
* Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string
688+
* can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
689+
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
690+
*/
691+
def readSchemaFromFooter(
692+
footer: Footer, converter: CatalystSchemaConverter): StructType = {
693+
val fileMetaData = footer.getParquetMetadata.getFileMetaData
694+
fileMetaData
695+
.getKeyValueMetaData
696+
.toMap
697+
.get(RowReadSupport.SPARK_METADATA_KEY)
698+
.flatMap(deserializeSchemaString)
699+
.getOrElse(converter.convert(fileMetaData.getSchema))
700+
}
701+
702+
private def deserializeSchemaString(schemaString: String): Option[StructType] = {
703+
// Tries to deserialize the schema string as JSON first, then falls back to the case class
704+
// string parser (data generated by older versions of Spark SQL uses this format).
705+
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
706+
case _: Throwable =>
707+
logInfo(
708+
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
709+
"falling back to the deprecated DataType.fromCaseClassString parser.")
710+
DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
711+
}.recoverWith {
712+
case cause: Throwable =>
713+
logWarning(
714+
"Failed to parse and ignored serialized Spark schema in " +
715+
s"Parquet key-value metadata:\n\t$schemaString", cause)
716+
Failure(cause)
717+
}.toOption
718+
}
649719
}

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ private[sql] object ResolvedDataSource {
247247
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
248248
val paths = {
249249
val patternPath = new Path(caseInsensitiveOptions("path"))
250-
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
250+
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
251+
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
252+
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
251253
}
252254

253255
val dataSchema =
@@ -272,7 +274,9 @@ private[sql] object ResolvedDataSource {
272274
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
273275
val paths = {
274276
val patternPath = new Path(caseInsensitiveOptions("path"))
275-
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
277+
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
278+
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
279+
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
276280
}
277281
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
278282
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>

0 commit comments

Comments
 (0)