Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ class SparkHadoopUtil extends Logging {
}.getOrElse(Seq.empty[Path])
}

def globPathIfNecessary(pattern: Path): Seq[Path] = {
if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
globPath(pattern)
} else {
Seq(pattern)
}
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import java.util.Properties

import org.apache.hadoop.fs.Path
import org.apache.spark.Partition
import org.apache.spark.{Logging, Partition}

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
Expand All @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.StructType
* @since 1.4.0
*/
@Experimental
class DataFrameReader private[sql](sqlContext: SQLContext) {
class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {

/**
* Specifies the input data source format.
Expand Down Expand Up @@ -260,7 +260,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
if (paths.isEmpty) {
sqlContext.emptyDataFrame
} else {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
val globbedPaths = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray

sqlContext.baseRelationToDataFrame(
new ParquetRelation2(
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private[spark] object SQLConf {
doc = "<TODO>")

val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a per relation option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one. Defined in object ParquetRelation2, named mergeSchema.

defaultValue = Some(true),
defaultValue = Some(false),
doc = "When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
Expand Down Expand Up @@ -361,6 +361,11 @@ private[spark] object SQLConf {
val OUTPUT_COMMITTER_CLASS =
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
defaultValue = Some(32),
doc = "<TODO>")

// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis",
Expand Down Expand Up @@ -538,6 +543,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)

private[spark] def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)

// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,14 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
}
}

// TODO Removes this class after removing old Parquet support code
/**
* We extend ParquetInputFormat in order to have more control over which
* RecordFilter we want to use.
*/
private[parquet] class FilteringParquetRowInputFormat
extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging {

private var fileStatuses = Map.empty[Path, FileStatus]

override def createRecordReader(
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = {
Expand All @@ -455,17 +454,6 @@ private[parquet] class FilteringParquetRowInputFormat

}

private[parquet] object FilteringParquetRowInputFormat {
private val footerCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[FileStatus, Footer]()

private val blockLocationCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
.build[FileStatus, Array[BlockLocation]]()
}

private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
Expand Down
158 changes: 114 additions & 44 deletions sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try
import scala.util.{Failure, Try}

import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
Expand All @@ -31,12 +31,11 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._
import org.apache.spark.sql._
Expand Down Expand Up @@ -278,19 +277,13 @@ private[sql] class ParquetRelation2(
// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _

val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

Utils.withDummyCallSite(sqlContext.sparkContext) {
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
// and footers. Especially when a global arbitrative schema (either from metastore or data
// source DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
keyClass = classOf[Void],
valueClass = classOf[InternalRow]) {

Expand All @@ -306,12 +299,6 @@ private[sql] class ParquetRelation2(
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
}.toSeq

private def escapePathUserInfo(path: Path): Path = {
val uri = path.toUri
new Path(new URI(
Expand All @@ -321,13 +308,10 @@ private[sql] class ParquetRelation2(

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
val inputFormat = new ParquetInputFormat[InternalRow] {
override def listStatus(jobContext: JobContext): JList[FileStatus] = {
if (cacheMetadata) cachedStatuses else super.listStatus(jobContext)
}
} else {
new FilteringParquetRowInputFormat
}

val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
Expand All @@ -348,9 +332,6 @@ private[sql] class ParquetRelation2(
// `FileStatus` objects of all "_common_metadata" files.
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[Path, Footer] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _

Expand All @@ -376,20 +357,6 @@ private[sql] class ParquetRelation2(
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = {
val conf = SparkHadoopUtil.get.conf
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
conf, seqAsJavaList(leaves), taskSideMetaData)
} else {
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
conf, seqAsJavaList(leaves), taskSideMetaData)
}

rawFooters.map(footer => footer.getFile -> footer).toMap
}

// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
if (dataSchema == null) {
Expand Down Expand Up @@ -422,7 +389,7 @@ private[sql] class ParquetRelation2(
// Always tries the summary files first if users don't require a merged schema. In this case,
// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
// groups information, and could be much smaller for large Parquet files with lots of row
// groups.
// groups. If no summary file is available, falls back to some random part-file.
//
// NOTE: Metadata stored in the summary files are merged from all part-files. However, for
// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
Expand Down Expand Up @@ -457,10 +424,10 @@ private[sql] class ParquetRelation2(

assert(
filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined,
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
"No predefined schema found, " +
s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
}
}
}
Expand Down Expand Up @@ -519,6 +486,7 @@ private[sql] object ParquetRelation2 extends Logging {
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus])(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
Expand All @@ -543,7 +511,7 @@ private[sql] object ParquetRelation2 extends Logging {
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
if (serializedSchema == None) {
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
} else if (!seen.contains(serializedSchema.get)) {
Expand Down Expand Up @@ -646,4 +614,106 @@ private[sql] object ParquetRelation2 extends Logging {
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}

/**
* Figures out a merged Parquet schema with a distributed Spark job.
*
* Note that locality is not taken into consideration here because:
*
* 1. For a single Parquet part-file, in most cases the footer only resides in the last block of
* that file. Thus we only need to retrieve the location of the last block. However, Hadoop
* `FileSystem` only provides API to retrieve locations of all blocks, which can be
* potentially expensive.
*
* 2. This optimization is mainly useful for S3, where file metadata operations can be pretty
* slow. And basically locality is not available when using S3 (you can't run computation on
* S3 nodes).
*/
def mergeSchemasInParallel(
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)

// HACK ALERT:
//
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
// but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well
// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These
// facts virtually prevents us to serialize `FileStatus`es.
//
// Since Parquet only relies on path and length information of those `FileStatus`es to read
// footers, here we just extract them (which can be easily serialized), send them to executor
// side, and resemble fake `FileStatus`es there.
val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))

// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sqlContext
.sparkContext
.parallelize(partialFileStatusInfo)
.mapPartitions { iterator =>
// Resembles fake `FileStatus`es with serialized path and length information.
val fakeFileStatuses = iterator.map { case (path, length) =>
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq

// Skips row group information since we only need the schema
val skipRowGroups = true

// Reads footers in multi-threaded manner within each task
val footers =
ParquetFileReader.readAllFootersInParallel(
serializedConf.value, fakeFileStatuses, skipRowGroups)

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
new CatalystSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
followParquetFormatSpec = followParquetFormatSpec)

footers.map { footer =>
ParquetRelation2.readSchemaFromFooter(footer, converter)
}.reduceOption(_ merge _).iterator
}.collect()

partiallyMergedSchemas.reduceOption(_ merge _)
}

/**
* Reads Spark SQL schema from a Parquet footer. If a valid serialized Spark SQL schema string
* can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: CatalystSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}

private def deserializeSchemaString(schemaString: String): Option[StructType] = {
// Tries to deserialize the schema string as JSON first, then falls back to the case class
// string parser (data generated by older versions of Spark SQL uses this format).
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
DataType.fromCaseClassString(schemaString).asInstanceOf[StructType]
}.recoverWith {
case cause: Throwable =>
logWarning(
"Failed to parse and ignored serialized Spark schema in " +
s"Parquet key-value metadata:\n\t$schemaString", cause)
Failure(cause)
}.toOption
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ private[sql] object ResolvedDataSource {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}

val dataSchema =
Expand All @@ -272,7 +274,9 @@ private[sql] object ResolvedDataSource {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
Expand Down
Loading