Skip to content

Commit 821e28d

Browse files
committed
Backports PR apache#7396 to branch-1.4
1 parent 2a7ea31 commit 821e28d

File tree

7 files changed

+269
-75
lines changed

7 files changed

+269
-75
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ class SparkHadoopUtil extends Logging {
238238
}.getOrElse(Seq.empty[Path])
239239
}
240240

241+
def globPathIfNecessary(pattern: Path): Seq[Path] = {
242+
if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
243+
globPath(pattern)
244+
} else {
245+
Seq(pattern)
246+
}
247+
}
248+
241249
/**
242250
* Lists all the files in a directory with the specified prefix, and does not end with the
243251
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ package org.apache.spark.sql
2020
import java.util.Properties
2121

2222
import org.apache.hadoop.fs.Path
23-
import org.apache.spark.Partition
2423

2524
import org.apache.spark.annotation.Experimental
2625
import org.apache.spark.api.java.JavaRDD
2726
import org.apache.spark.deploy.SparkHadoopUtil
2827
import org.apache.spark.rdd.RDD
2928
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
30-
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
29+
import org.apache.spark.sql.json.{JSONRelation, JsonRDD}
3130
import org.apache.spark.sql.parquet.ParquetRelation2
3231
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
3332
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.{Logging, Partition}
3434

3535
/**
3636
* :: Experimental ::
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType
4040
* @since 1.4.0
4141
*/
4242
@Experimental
43-
class DataFrameReader private[sql](sqlContext: SQLContext) {
43+
class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
4444

4545
/**
4646
* Specifies the input data source format.
@@ -260,10 +260,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
260260
if (paths.isEmpty) {
261261
sqlContext.emptyDataFrame
262262
} else {
263-
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
263+
val globbedPaths = paths.flatMap { path =>
264+
val hdfsPath = new Path(path)
265+
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
266+
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
267+
SparkHadoopUtil.get.globPathIfNecessary(qualified)
268+
}.toArray
269+
264270
sqlContext.baseRelationToDataFrame(
265271
new ParquetRelation2(
266-
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))
272+
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
267273
}
268274
}
269275

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ private[spark] object SQLConf {
7676
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
7777
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
7878

79+
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
80+
"spark.sql.sources.parallelPartitionDiscovery.threshold"
81+
7982
// Whether to perform eager analysis when constructing a dataframe.
8083
// Set to false when debugging requires the ability to look at invalid query plans.
8184
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
@@ -251,6 +254,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
251254
private[spark] def partitionDiscoveryEnabled() =
252255
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
253256

257+
private[spark] def parallelPartitionDiscoveryThreshold: Int =
258+
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, "32").toInt
259+
254260
// Do not use a value larger than 4000 as the default value of this property.
255261
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
256262
private[spark] def schemaStringLengthThreshold: Int =

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 128 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.net.URI
2121
import java.util.{List => JList}
2222

2323
import scala.collection.JavaConversions._
24-
import scala.util.Try
24+
import scala.util.{Failure, Try}
2525

2626
import com.google.common.base.Objects
2727
import org.apache.hadoop.conf.Configuration
@@ -33,17 +33,16 @@ import parquet.filter2.predicate.FilterApi
3333
import parquet.hadoop._
3434
import parquet.hadoop.metadata.CompressionCodecName
3535
import parquet.hadoop.util.ContextUtil
36+
import parquet.schema.MessageType
3637

37-
import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
3838
import org.apache.spark.broadcast.Broadcast
39-
import org.apache.spark.deploy.SparkHadoopUtil
40-
import org.apache.spark.rdd.RDD._
4139
import org.apache.spark.rdd.RDD
42-
import org.apache.spark.sql.AnalysisException
40+
import org.apache.spark.rdd.RDD._
4341
import org.apache.spark.sql.sources._
4442
import org.apache.spark.sql.types.{DataType, StructType}
45-
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
43+
import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext}
4644
import org.apache.spark.util.Utils
45+
import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWritable, SparkException}
4746

4847
private[sql] class DefaultSource extends HadoopFsRelationProvider {
4948
override def createRelation(
@@ -144,7 +143,7 @@ private[sql] class ParquetRelation2(
144143

145144
// Should we merge schemas from all Parquet part-files?
146145
private val shouldMergeSchemas =
147-
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
146+
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "false").toBoolean
148147

149148
private val maybeMetastoreSchema = parameters
150149
.get(ParquetRelation2.METASTORE_SCHEMA)
@@ -261,19 +260,23 @@ private[sql] class ParquetRelation2(
261260
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
262261
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
263262
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
263+
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
264+
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
265+
264266
// Create the function to set variable Parquet confs at both driver and executor side.
265267
val initLocalJobFuncOpt =
266268
ParquetRelation2.initializeLocalJobFunc(
267269
requiredColumns,
268270
filters,
269271
dataSchema,
270272
useMetadataCache,
271-
parquetFilterPushDown) _
273+
parquetFilterPushDown,
274+
assumeBinaryIsString,
275+
assumeInt96IsTimestamp) _
276+
272277
// Create the function to set input paths at the driver side.
273278
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
274279

275-
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
276-
277280
Utils.withDummyCallSite(sqlContext.sparkContext) {
278281
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
279282
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
@@ -300,12 +303,6 @@ private[sql] class ParquetRelation2(
300303
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
301304
}.toSeq
302305

303-
@transient val cachedFooters = footers.map { f =>
304-
// In order to encode the authority of a Path containing special characters such as /,
305-
// we need to use the string returned by the URI of the path to create a new Path.
306-
new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
307-
}.toSeq
308-
309306
private def escapePathUserInfo(path: Path): Path = {
310307
val uri = path.toUri
311308
new Path(new URI(
@@ -318,7 +315,6 @@ private[sql] class ParquetRelation2(
318315
val inputFormat = if (cacheMetadata) {
319316
new FilteringParquetRowInputFormat {
320317
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
321-
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
322318
}
323319
} else {
324320
new FilteringParquetRowInputFormat
@@ -342,9 +338,6 @@ private[sql] class ParquetRelation2(
342338
// `FileStatus` objects of all "_common_metadata" files.
343339
private var commonMetadataStatuses: Array[FileStatus] = _
344340

345-
// Parquet footer cache.
346-
var footers: Map[Path, Footer] = _
347-
348341
// `FileStatus` objects of all data files (Parquet part-files).
349342
var dataStatuses: Array[FileStatus] = _
350343

@@ -370,20 +363,6 @@ private[sql] class ParquetRelation2(
370363
commonMetadataStatuses =
371364
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
372365

373-
footers = {
374-
val conf = SparkHadoopUtil.get.conf
375-
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
376-
val rawFooters = if (shouldMergeSchemas) {
377-
ParquetFileReader.readAllFootersInParallel(
378-
conf, seqAsJavaList(leaves), taskSideMetaData)
379-
} else {
380-
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
381-
conf, seqAsJavaList(leaves), taskSideMetaData)
382-
}
383-
384-
rawFooters.map(footer => footer.getFile -> footer).toMap
385-
}
386-
387366
// If we already get the schema, don't need to re-compute it since the schema merging is
388367
// time-consuming.
389368
if (dataSchema == null) {
@@ -416,7 +395,7 @@ private[sql] class ParquetRelation2(
416395
// Always tries the summary files first if users don't require a merged schema. In this case,
417396
// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
418397
// groups information, and could be much smaller for large Parquet files with lots of row
419-
// groups.
398+
// groups. If no summary file is available, falls back to some random part-file.
420399
//
421400
// NOTE: Metadata stored in the summary files are merged from all part-files. However, for
422401
// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
@@ -451,10 +430,10 @@ private[sql] class ParquetRelation2(
451430

452431
assert(
453432
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
454-
"No schema defined, " +
455-
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
433+
"No predefined schema found, " +
434+
s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
456435

457-
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
436+
ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
458437
}
459438
}
460439
}
@@ -473,9 +452,11 @@ private[sql] object ParquetRelation2 extends Logging {
473452
filters: Array[Filter],
474453
dataSchema: StructType,
475454
useMetadataCache: Boolean,
476-
parquetFilterPushDown: Boolean)(job: Job): Unit = {
455+
parquetFilterPushDown: Boolean,
456+
assumeBinaryIsString: Boolean,
457+
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
477458
val conf = job.getConfiguration
478-
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName())
459+
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[RowReadSupport].getName)
479460

480461
// Try to push down filters when filter push-down is enabled.
481462
if (parquetFilterPushDown) {
@@ -499,6 +480,10 @@ private[sql] object ParquetRelation2 extends Logging {
499480

500481
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
501482
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
483+
484+
// Sets flags for Parquet schema conversion
485+
conf.set(SQLConf.PARQUET_BINARY_AS_STRING, assumeBinaryIsString.toString)
486+
conf.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP, assumeInt96IsTimestamp.toString)
502487
}
503488

504489
/** This closure sets input paths at the driver side. */
@@ -620,4 +605,107 @@ private[sql] object ParquetRelation2 extends Logging {
620605
.filter(_.nullable)
621606
StructType(parquetSchema ++ missingFields)
622607
}
608+
609+
/**
610+
* Figures out a merged Parquet schema with a distributed Spark job.
611+
*
612+
* Note that locality is not taken into consideration here because:
613+
*
614+
* 1. For a single Parquet part-file, in most cases the footer only resides in the last block of
615+
* that file. Thus we only need to retrieve the location of the last block. However, Hadoop
616+
* `FileSystem` only provides API to retrieve locations of all blocks, which can be
617+
* potentially expensive.
618+
*
619+
* 2. This optimization is mainly useful for S3, where file metadata operations can be pretty
620+
* slow. And basically locality is not available when using S3 (you can't run computation on
621+
* S3 nodes).
622+
*/
623+
def mergeSchemasInParallel(
624+
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
625+
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
626+
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
627+
val serializedConf =
628+
new SerializableWritable[Configuration](sqlContext.sparkContext.hadoopConfiguration)
629+
630+
// HACK ALERT:
631+
//
632+
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
633+
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
634+
// but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well
635+
// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
636+
// facts virtually prevents us to serialize `FileStatus`es.
637+
//
638+
// Since Parquet only relies on path and length information of those `FileStatus`es to read
639+
// footers, here we just extract them (which can be easily serialized), send them to executor
640+
// side, and resemble fake `FileStatus`es there.
641+
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
642+
643+
// Issues a Spark job to read Parquet schema in parallel.
644+
val partiallyMergedSchemas =
645+
sqlContext
646+
.sparkContext
647+
.parallelize(partialFileStatusInfo)
648+
.mapPartitions { iterator =>
649+
// Resembles fake `FileStatus`es with serialized path and length information.
650+
val fakeFileStatuses = iterator.map { case (path, length) =>
651+
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
652+
}.toSeq
653+
654+
// Skips row group information since we only need the schema
655+
val skipRowGroups = true
656+
657+
// Reads footers in multi-threaded manner within each task
658+
val footers =
659+
ParquetFileReader.readAllFootersInParallel(
660+
serializedConf.value, fakeFileStatuses, skipRowGroups)
661+
662+
footers.map { footer =>
663+
ParquetRelation2.readSchemaFromFooter(
664+
footer, assumeBinaryIsString, assumeInt96IsTimestamp)
665+
}.reduceOption(_ merge _).iterator
666+
}.collect()
667+
668+
partiallyMergedSchemas.reduceOption(_ merge _)
669+
}
670+
671+
/**
672+
* Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string
673+
* can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
674+
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
675+
*/
676+
def readSchemaFromFooter(
677+
footer: Footer,
678+
assumeBinaryIsString: Boolean,
679+
assumeInt96IsTimestamp: Boolean): StructType = {
680+
val fileMetaData = footer.getParquetMetadata.getFileMetaData
681+
fileMetaData
682+
.getKeyValueMetaData
683+
.toMap
684+
.get(RowReadSupport.SPARK_METADATA_KEY)
685+
.flatMap(deserializeSchemaString)
686+
.getOrElse(
687+
StructType.fromAttributes(
688+
ParquetTypesConverter.convertToAttributes(
689+
fileMetaData.getSchema,
690+
assumeBinaryIsString,
691+
assumeInt96IsTimestamp)))
692+
}
693+
694+
private def deserializeSchemaString(schemaString: String): Option[StructType] = {
695+
// Tries to deserialize the schema string as JSON first, then falls back to the case class
696+
// string parser (data generated by older versions of Spark SQL uses this format).
697+
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
698+
case _: Throwable =>
699+
logInfo(
700+
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
701+
"falling back to the deprecated DataType.fromCaseClassString parser.")
702+
DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
703+
}.recoverWith {
704+
case cause: Throwable =>
705+
logWarning(
706+
"Failed to parse and ignored serialized Spark schema in " +
707+
s"Parquet key-value metadata:\n\t$schemaString", cause)
708+
Failure(cause)
709+
}.toOption
710+
}
623711
}

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,9 @@ private[sql] object ResolvedDataSource {
246246
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
247247
val paths = {
248248
val patternPath = new Path(caseInsensitiveOptions("path"))
249-
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
249+
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
250+
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
251+
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
250252
}
251253

252254
val dataSchema =
@@ -271,7 +273,9 @@ private[sql] object ResolvedDataSource {
271273
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
272274
val paths = {
273275
val patternPath = new Path(caseInsensitiveOptions("path"))
274-
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
276+
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
277+
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
278+
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
275279
}
276280
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
277281
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>

0 commit comments

Comments
 (0)