Skip to content

Commit dc1562e

Browse files
tdasyhuai
authored andcommitted
[SPARK-14997][SQL] Fixed FileCatalog to return correct set of files when there is no partitioning scheme in the given paths
## What changes were proposed in this pull request? Lets says there are json files in the following directories structure ``` xyz/file0.json xyz/subdir1/file1.json xyz/subdir2/file2.json xyz/subdir1/subsubdir1/file3.json ``` `sqlContext.read.json("xyz")` should read only file0.json according to behavior in Spark 1.6.1. However in current master, all the 4 files are read. The fix is to make FileCatalog return only the children files of the given path if there is not partitioning detected (instead of all the recursive list of files). Closes #12774 ## How was this patch tested? unit tests Author: Tathagata Das <[email protected]> Closes #12856 from tdas/SPARK-14997. (cherry picked from commit f7b7ef4) Signed-off-by: Yin Huai <[email protected]>
1 parent 22f9f5f commit dc1562e

File tree

5 files changed

+356
-30
lines changed

5 files changed

+356
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,29 @@ abstract class PartitioningAwareFileCatalog(
6161
}
6262
}
6363

64-
override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
64+
override def allFiles(): Seq[FileStatus] = {
65+
if (partitionSpec().partitionColumns.isEmpty) {
66+
// For each of the input paths, get the list of files inside them
67+
paths.flatMap { path =>
68+
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
69+
val fs = path.getFileSystem(hadoopConf)
70+
val qualifiedPath = fs.makeQualified(path)
71+
72+
// There are three cases possible with each path
73+
// 1. The path is a directory and has children files in it. Then it must be present in
74+
// leafDirToChildrenFiles as those children files will have been found as leaf files.
75+
// Find its children files from leafDirToChildrenFiles and include them.
76+
// 2. The path is a file, then it will be present in leafFiles. Include this path.
77+
// 3. The path is a directory, but has no children files. Do not include this path.
78+
79+
leafDirToChildrenFiles.get(qualifiedPath)
80+
.orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
81+
.getOrElse(Array.empty)
82+
}
83+
} else {
84+
leafFiles.values.toSeq
85+
}
86+
}
6587

6688
protected def inferPartitioning(): PartitionSpec = {
6789
// We use leaf dirs containing data files to discover the schema.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
import java.io.File
21+
22+
import org.apache.hadoop.fs.Path
23+
24+
import org.apache.spark.sql.catalyst.util._
25+
import org.apache.spark.sql.test.SharedSQLContext
26+
27+
class FileCatalogSuite extends SharedSQLContext {
28+
29+
test("ListingFileCatalog: leaf files are qualified paths") {
30+
withTempDir { dir =>
31+
val file = new File(dir, "text.txt")
32+
stringToFile(file, "text")
33+
34+
val path = new Path(file.getCanonicalPath)
35+
val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) {
36+
def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
37+
def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
38+
}
39+
assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
40+
assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
41+
}
42+
}
43+
44+
test("ListingFileCatalog: input paths are converted to qualified paths") {
45+
withTempDir { dir =>
46+
val file = new File(dir, "text.txt")
47+
stringToFile(file, "text")
48+
49+
val unqualifiedDirPath = new Path(dir.getCanonicalPath)
50+
val unqualifiedFilePath = new Path(file.getCanonicalPath)
51+
require(!unqualifiedDirPath.toString.contains("file:"))
52+
require(!unqualifiedFilePath.toString.contains("file:"))
53+
54+
val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
55+
val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
56+
require(qualifiedFilePath.toString.startsWith("file:"))
57+
58+
val catalog1 = new ListingFileCatalog(
59+
sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None)
60+
assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
61+
62+
val catalog2 = new ListingFileCatalog(
63+
sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None)
64+
assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
65+
66+
}
67+
}
68+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
765765
}
766766
}
767767

768+
test("use basePath and file globbing to selectively load partitioned table") {
769+
withTempPath { dir =>
770+
771+
val df = Seq(
772+
(1, "foo", 100),
773+
(1, "bar", 200),
774+
(2, "foo", 300),
775+
(2, "bar", 400)
776+
).toDF("p1", "p2", "v")
777+
df.write
778+
.mode(SaveMode.Overwrite)
779+
.partitionBy("p1", "p2")
780+
.parquet(dir.getCanonicalPath)
781+
782+
def check(path: String, basePath: String, expectedDf: DataFrame): Unit = {
783+
val testDf = sqlContext.read
784+
.option("basePath", basePath)
785+
.parquet(path)
786+
checkAnswer(testDf, expectedDf)
787+
}
788+
789+
// Should find all the data with partitioning columns when base path is set to the root
790+
val resultDf = df.select("v", "p1", "p2")
791+
check(path = s"$dir", basePath = s"$dir", resultDf)
792+
check(path = s"$dir/*", basePath = s"$dir", resultDf)
793+
check(path = s"$dir/*/*", basePath = s"$dir", resultDf)
794+
check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf)
795+
796+
// Should find selective partitions of the data if the base path is not set to root
797+
798+
check( // read from ../p1=1 with base ../p1=1, should not infer p1 col
799+
path = s"$dir/p1=1/*",
800+
basePath = s"$dir/p1=1/",
801+
resultDf.filter("p1 = 1").drop("p1"))
802+
803+
check( // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1
804+
path = s"$dir/p1=1/p2=foo/*",
805+
basePath = s"$dir/p1=1/",
806+
resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1"))
807+
808+
check( // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer p1, p2
809+
path = s"$dir/p1=1/p2=foo/*",
810+
basePath = s"$dir/p1=1/p2=foo/",
811+
resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2"))
812+
}
813+
}
814+
768815
test("_SUCCESS should not break partitioning discovery") {
769816
Seq(1, 32).foreach { threshold =>
770817
// We have two paths to list files, one at driver side, another one that we use

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.File
21+
import java.util.UUID
2122

2223
import org.apache.spark.sql._
2324
import org.apache.spark.sql.catalyst.util._
@@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
8485
AddParquetFileData(seq.toDS().toDF(), src, tmp)
8586
}
8687

88+
/** Write parquet files in a temp dir, and move the individual files to the 'src' dir */
8789
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
88-
val file = Utils.tempFileWith(new File(tmp, "parquet"))
89-
df.write.parquet(file.getCanonicalPath)
90-
file.renameTo(new File(src, file.getName))
90+
val tmpDir = Utils.tempFileWith(new File(tmp, "parquet"))
91+
df.write.parquet(tmpDir.getCanonicalPath)
92+
tmpDir.listFiles().foreach { f =>
93+
f.renameTo(new File(src, s"${f.getName}"))
94+
}
9195
}
9296
}
9397

@@ -210,8 +214,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
210214

211215
test("FileStreamSource schema: parquet, existing files, no schema") {
212216
withTempDir { src =>
213-
Seq("a", "b", "c").toDS().as("userColumn").toDF()
214-
.write.parquet(new File(src, "1").getCanonicalPath)
217+
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
218+
.mode(org.apache.spark.sql.SaveMode.Overwrite)
219+
.parquet(src.getCanonicalPath)
215220
val schema = createFileStreamSourceAndGetSchema(
216221
format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None)
217222
assert(schema === new StructType().add("value", StringType))

0 commit comments

Comments
 (0)