Skip to content

Commit 4ef39c2

Browse files
ericlrxin
authored andcommitted
[SPARK-17974] try 2) Refactor FileCatalog classes to simplify the inheritance tree
## What changes were proposed in this pull request? This renames `BasicFileCatalog => FileCatalog`, combines `SessionFileCatalog` with `PartitioningAwareFileCatalog`, and removes the old `FileCatalog` trait. In summary, ``` MetadataLogFileCatalog extends PartitioningAwareFileCatalog ListingFileCatalog extends PartitioningAwareFileCatalog PartitioningAwareFileCatalog extends FileCatalog TableFileCatalog extends FileCatalog ``` (note that this is a re-submission of #15518 which got reverted) ## How was this patch tested? Existing tests Author: Eric Liang <[email protected]> Closes #15533 from ericl/fix-scalastyle-revert.
1 parent 231f39e commit 4ef39c2

File tree

13 files changed

+304
-354
lines changed

13 files changed

+304
-354
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
4343
import org.apache.spark.sql.catalyst.util.usePrettyExpression
4444
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
4545
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
46-
import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
46+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
4747
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
4848
import org.apache.spark.sql.execution.python.EvaluatePython
4949
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ case class FileSourceScanExec(
431431
private def createBucketedReadRDD(
432432
bucketSpec: BucketSpec,
433433
readFile: (PartitionedFile) => Iterator[InternalRow],
434-
selectedPartitions: Seq[Partition],
434+
selectedPartitions: Seq[PartitionDirectory],
435435
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
436436
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
437437
val bucketed =
@@ -463,7 +463,7 @@ case class FileSourceScanExec(
463463
*/
464464
private def createNonBucketedReadRDD(
465465
readFile: (PartitionedFile) => Iterator[InternalRow],
466-
selectedPartitions: Seq[Partition],
466+
selectedPartitions: Seq[PartitionDirectory],
467467
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
468468
val defaultMaxSplitBytes =
469469
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import org.apache.hadoop.fs._
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.expressions._
24+
25+
/**
26+
* A collection of data files from a partitioned relation, along with the partition values in the
27+
* form of an [[InternalRow]].
28+
*/
29+
case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
30+
31+
/**
32+
* An interface for objects capable of enumerating the root paths of a relation as well as the
33+
* partitions of a relation subject to some pruning expressions.
34+
*/
35+
trait FileCatalog {
36+
37+
/**
38+
* Returns the list of root input paths from which the catalog will get files. There may be a
39+
* single root path from which partitions are discovered, or individual partitions may be
40+
* specified by each path.
41+
*/
42+
def rootPaths: Seq[Path]
43+
44+
/**
45+
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
46+
* unpartitioned, this will return a single partition with no partition values.
47+
*
48+
* @param filters The filters used to prune which partitions are returned. These filters must
49+
* only refer to partition columns and this method will only return files
50+
* where these predicates are guaranteed to evaluate to `true`. Thus, these
51+
* filters will not need to be evaluated again on the returned data.
52+
*/
53+
def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
54+
55+
/**
56+
* Returns the list of files that will be read when scanning this relation. This call may be
57+
* very expensive for large tables.
58+
*/
59+
def inputFiles: Array[String]
60+
61+
/** Refresh any cached file listings */
62+
def refresh(): Unit
63+
64+
/** Sum of table file sizes, in bytes */
65+
def sizeInBytes: Long
66+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -175,64 +175,3 @@ abstract class TextBasedFileFormat extends FileFormat {
175175
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
176176
}
177177
}
178-
179-
/**
180-
* A collection of data files from a partitioned relation, along with the partition values in the
181-
* form of an [[InternalRow]].
182-
*/
183-
case class Partition(values: InternalRow, files: Seq[FileStatus])
184-
185-
/**
186-
* An interface for objects capable of enumerating the root paths of a relation as well as the
187-
* partitions of a relation subject to some pruning expressions.
188-
*/
189-
trait BasicFileCatalog {
190-
191-
/**
192-
* Returns the list of root input paths from which the catalog will get files. There may be a
193-
* single root path from which partitions are discovered, or individual partitions may be
194-
* specified by each path.
195-
*/
196-
def rootPaths: Seq[Path]
197-
198-
/**
199-
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
200-
* unpartitioned, this will return a single partition with no partition values.
201-
*
202-
* @param filters The filters used to prune which partitions are returned. These filters must
203-
* only refer to partition columns and this method will only return files
204-
* where these predicates are guaranteed to evaluate to `true`. Thus, these
205-
* filters will not need to be evaluated again on the returned data.
206-
*/
207-
def listFiles(filters: Seq[Expression]): Seq[Partition]
208-
209-
/** Returns the list of files that will be read when scanning this relation. */
210-
def inputFiles: Array[String]
211-
212-
/** Refresh any cached file listings */
213-
def refresh(): Unit
214-
215-
/** Sum of table file sizes, in bytes */
216-
def sizeInBytes: Long
217-
}
218-
219-
/**
220-
* A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
221-
* those, infer the relation's partition specification.
222-
*/
223-
// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
224-
// which it is safe to list all of its files?
225-
trait FileCatalog extends BasicFileCatalog {
226-
227-
/** Returns the specification of the partitions inferred from the data. */
228-
def partitionSpec(): PartitionSpec
229-
230-
/** Returns all the valid files. */
231-
def allFiles(): Seq[FileStatus]
232-
233-
/** Returns the list of files that will be read when scanning this relation. */
234-
override def inputFiles: Array[String] =
235-
allFiles().map(_.getPath.toUri.toString).toArray
236-
237-
override def sizeInBytes: Long = allFiles().map(_.getLen).sum
238-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
2828
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
2929
* resolution and merging logic for schemas and partitions has been removed.
3030
*
31-
* @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
31+
* @param location A [[FileCatalog]] that can enumerate the locations of all the files that
3232
* comprise this relation.
3333
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
3434
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
3838
* @param options Configuration used when reading / writing data.
3939
*/
4040
case class HadoopFsRelation(
41-
location: BasicFileCatalog,
41+
location: FileCatalog,
4242
partitionSchema: StructType,
4343
dataSchema: StructType,
4444
bucketSpec: Option[BucketSpec],

0 commit comments

Comments
 (0)