Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ case class FileSourceScanExec(
private def createBucketedReadRDD(
bucketSpec: BucketSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[Partition],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val bucketed =
Expand Down Expand Up @@ -463,7 +463,7 @@ case class FileSourceScanExec(
*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[Partition],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._

/**
* A collection of data files from a partitioned relation, along with the partition values in the
* form of an [[InternalRow]].
*/
case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])

/**
* An interface for objects capable of enumerating the root paths of a relation as well as the
* partitions of a relation subject to some pruning expressions.
*/
trait FileCatalog {

/**
* Returns the list of root input paths from which the catalog will get files. There may be a
* single root path from which partitions are discovered, or individual partitions may be
* specified by each path.
*/
def rootPaths: Seq[Path]

/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
* unpartitioned, this will return a single partition with no partition values.
*
* @param filters The filters used to prune which partitions are returned. These filters must
* only refer to partition columns and this method will only return files
* where these predicates are guaranteed to evaluate to `true`. Thus, these
* filters will not need to be evaluated again on the returned data.
*/
def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]

/**
* Returns the list of files that will be read when scanning this relation. This call may be
* very expensive for large tables.
*/
def inputFiles: Array[String]

/** Refresh any cached file listings */
def refresh(): Unit

/** Sum of table file sizes, in bytes */
def sizeInBytes: Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,64 +175,3 @@ abstract class TextBasedFileFormat extends FileFormat {
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}
}

/**
* A collection of data files from a partitioned relation, along with the partition values in the
* form of an [[InternalRow]].
*/
case class Partition(values: InternalRow, files: Seq[FileStatus])

/**
* An interface for objects capable of enumerating the root paths of a relation as well as the
* partitions of a relation subject to some pruning expressions.
*/
trait BasicFileCatalog {

/**
* Returns the list of root input paths from which the catalog will get files. There may be a
* single root path from which partitions are discovered, or individual partitions may be
* specified by each path.
*/
def rootPaths: Seq[Path]

/**
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
* unpartitioned, this will return a single partition with no partition values.
*
* @param filters The filters used to prune which partitions are returned. These filters must
* only refer to partition columns and this method will only return files
* where these predicates are guaranteed to evaluate to `true`. Thus, these
* filters will not need to be evaluated again on the returned data.
*/
def listFiles(filters: Seq[Expression]): Seq[Partition]

/** Returns the list of files that will be read when scanning this relation. */
def inputFiles: Array[String]

/** Refresh any cached file listings */
def refresh(): Unit

/** Sum of table file sizes, in bytes */
def sizeInBytes: Long
}

/**
* A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
* those, infer the relation's partition specification.
*/
// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
// which it is safe to list all of its files?
trait FileCatalog extends BasicFileCatalog {

/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec

/** Returns all the valid files. */
def allFiles(): Seq[FileStatus]

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
allFiles().map(_.getPath.toUri.toString).toArray

override def sizeInBytes: Long = allFiles().map(_.getLen).sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
* @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
location: BasicFileCatalog,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
Expand Down
Loading