Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,29 @@ abstract class PartitioningAwareFileCatalog(
}
}

override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
override def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
// For each of the input paths, get the list of files inside them
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = fs.makeQualified(path)

// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
// leafDirToChildrenFiles as those children files will have been found as leaf files.
// Find its children files from leafDirToChildrenFiles and include them.
// 2. The path is a file, then it will be present in leafFiles. Include this path.
// 3. The path is a directory, but has no children files. Do not include this path.

leafDirToChildrenFiles.get(qualifiedPath)
.orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
.getOrElse(Array.empty)
}
} else {
leafFiles.values.toSeq
}
}

protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext

class FileCatalogSuite extends SharedSQLContext {

test("ListingFileCatalog: leaf files are qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")

val path = new Path(file.getCanonicalPath)
val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) {
def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
}
assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
}
}

test("ListingFileCatalog: input paths are converted to qualified paths") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")

val unqualifiedDirPath = new Path(dir.getCanonicalPath)
val unqualifiedFilePath = new Path(file.getCanonicalPath)
require(!unqualifiedDirPath.toString.contains("file:"))
require(!unqualifiedFilePath.toString.contains("file:"))

val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
require(qualifiedFilePath.toString.startsWith("file:"))

val catalog1 = new ListingFileCatalog(
sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))

val catalog2 = new ListingFileCatalog(
sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}

test("use basePath and file globbing to selectively load partitioned table") {
withTempPath { dir =>

val df = Seq(
(1, "foo", 100),
(1, "bar", 200),
(2, "foo", 300),
(2, "bar", 400)
).toDF("p1", "p2", "v")
df.write
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.parquet(dir.getCanonicalPath)

def check(path: String, basePath: String, expectedDf: DataFrame): Unit = {
val testDf = sqlContext.read
.option("basePath", basePath)
.parquet(path)
checkAnswer(testDf, expectedDf)
}

// Should find all the data with partitioning columns when base path is set to the root
val resultDf = df.select("v", "p1", "p2")
check(path = s"$dir", basePath = s"$dir", resultDf)
check(path = s"$dir/*", basePath = s"$dir", resultDf)
check(path = s"$dir/*/*", basePath = s"$dir", resultDf)
check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf)

// Should find selective partitions of the data if the base path is not set to root

check( // read from ../p1=1 with base ../p1=1, should not infer p1 col
path = s"$dir/p1=1/*",
basePath = s"$dir/p1=1/",
resultDf.filter("p1 = 1").drop("p1"))

check( // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1
path = s"$dir/p1=1/p2=foo/*",
basePath = s"$dir/p1=1/",
resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1"))

check( // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer p1, p2
path = s"$dir/p1=1/p2=foo/*",
basePath = s"$dir/p1=1/p2=foo/",
resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2"))
}
}

test("_SUCCESS should not break partitioning discovery") {
Seq(1, 32).foreach { threshold =>
// We have two paths to list files, one at driver side, another one that we use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.util.UUID

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
AddParquetFileData(seq.toDS().toDF(), src, tmp)
}

/** Write parquet files in a temp dir, and move the individual files to the 'src' dir */
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val file = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(file.getCanonicalPath)
file.renameTo(new File(src, file.getName))
val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
df.write.parquet(tmpDir.getCanonicalPath)
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
}
}

Expand Down Expand Up @@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {

test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF()
.write.parquet(new File(src, "1").getCanonicalPath)
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.parquet(src.getCanonicalPath)
val schema = createFileStreamSourceAndGetSchema(
format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
assert(schema === new StructType().add("value", StringType))
Expand Down
Loading