From b1f82ce389721c60bf22694470c52596f3ed9bcc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 2 May 2016 17:29:10 -0700 Subject: [PATCH 01/13] Fixed SPARK-14997 --- .../datasources/fileSourceInterfaces.scala | 32 ++++- .../sql/sources/HadoopFsRelationTest.scala | 130 +++++++++++++++++- 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 25f88d9c3948..20490984f2e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -291,8 +291,12 @@ class HDFSFileCatalog( refresh() override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + Partition( + InternalRow.empty, + unpartitionedDataFiles().filterNot(_.getPath.getName startsWith "_") + ) :: Nil } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => @@ -337,7 +341,13 @@ class HDFSFileCatalog( } } - def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq + def allFiles(): Seq[FileStatus] = { + if (partitionSpec().partitionColumns.isEmpty) { + unpartitionedDataFiles() + } else { + leafFiles.values.toSeq + } + } def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) @@ -387,7 +397,7 @@ class HDFSFileCatalog( } } - def inferPartitioning(schema: Option[StructType]): PartitionSpec = { + private def inferPartitioning(schema: Option[StructType]): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. val leafDirs = leafDirToChildrenFiles.keys.toSeq schema match { @@ -443,6 +453,22 @@ class HDFSFileCatalog( } } + /** List of files to consider when there is not inferred partitioning scheme */ + private def unpartitionedDataFiles(): Seq[FileStatus] = { + // For each of the input paths, get the list of files inside them + paths.flatMap { path => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = path.getFileSystem(hadoopConf) + val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + + // If it is a directory (i.e. exists in leafDirToChildrenFiles), return its children files + // Or if it is a file (i.e. exists in leafFiles), return the path itself + leafDirToChildrenFiles.get(qualifiedPath).orElse { + leafFiles.get(path).map(Array(_)) + }.getOrElse(Array.empty) + } + } + def refresh(): Unit = { val files = listLeafFiles(paths) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 67b403a9bd3a..cbf1d250357b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.io.File + import scala.util.Random import org.apache.hadoop.fs.Path @@ -486,7 +488,133 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - test("Hadoop style globbing") { + test("load() - with directory of unpartitioned data in nested subdirs") { + withTempPath { file => + val dir = file.getCanonicalPath + val subdir = new File(dir, "subdir").getCanonicalPath + + val dataInDir = Seq(1, 2, 3).toDF("value") + val dataInSubdir = Seq(4, 5, 6).toDF("value") + + /* + + Directory structure to be generated + + dir + | + |___ [ files of dataInDir ] + | + |___ subsubdir + | + |___ [ files of dataInSubdir ] + */ + + // Generated dataInSubdir, not data in dir + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .save(subdir) + + // Inferring schema should throw error as it should not find any file to infer + val e = intercept[AnalysisException] { + sqlContext.read.format(dataSourceName).load(dir) + } + assert(e.getMessage.contains("infer")) + + /** Test whether data is read with the given path matches the expected answer */ + def testWithPath(path: String, expectedAnswer: Seq[Row]): Unit = { + val df = sqlContext.read + .format(dataSourceName) + .schema(dataInDir.schema) // avoid schema inference for any format + .load(path) + checkAnswer(df, expectedAnswer) + } + + // Reading by the path 'file/' *not by 'file/subdir') should give empty results + // as there are no files in 'file' and it should not pick up files in 'file/subdir' + testWithPath(dir, Seq.empty) + + dataInDir.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .save(dir) + + // Should give only rows from partitionedTestDF2 + testWithPath(dir, dataInDir.collect()) + } + } + + test("Hadoop style globbing - unpartitioned data") { + withTempPath { file => + + val dir = file.getCanonicalPath + val subdir = new File(dir, "subdir").getCanonicalPath + val subsubdir = new File(subdir, "subsubdir").getCanonicalPath + val anotherSubsubdir = + new File(new File(dir, "another-subdir"), "another-subsubdir").getCanonicalPath + + val dataInSubdir = Seq(1, 2, 3).toDF("value") + val dataInSubsubdir = Seq(4, 5, 6).toDF("value") + val dataInAnotherSubsubdir = Seq(7, 8, 9).toDF("value") + + dataInSubdir.write + .format (dataSourceName) + .mode (SaveMode.Overwrite) + .save (subdir) + + dataInSubsubdir.write + .format (dataSourceName) + .mode (SaveMode.Overwrite) + .save (subsubdir) + + dataInAnotherSubsubdir.write + .format (dataSourceName) + .mode (SaveMode.Overwrite) + .save (anotherSubsubdir) + + /* + + Directory structure generated + + dir + | + |___ subdir + | | + | |___ [ files of dataInSubdir ] + | | + | |___ subsubdir + | | + | |___ [ files of dataInSubsubdir ] + | + | + |___ anotherSubdir + | + |___ anotherSubsubdir + | + |___ [ files of dataInAnotherSubsubdir ] + */ + + val schema = dataInSubdir.schema + + /** Test whether data is read with the given path matches the expected answer */ + def testWithPath(path: String, expectedDf: DataFrame): Unit = { + val df = sqlContext.read + .format(dataSourceName) + .schema(schema) // avoid schema inference for any format + .load(path) + checkAnswer(df, expectedDf) + } + + testWithPath(s"$dir/*/", dataInSubdir) + testWithPath(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir)) + testWithPath(s"$dir/another*/*", dataInAnotherSubsubdir) + testWithPath(s"$dir/*/another*", dataInAnotherSubsubdir) + testWithPath(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir)) + + } + } + + test("Hadoop style globbing - partitioned data") { withTempPath { file => partitionedTestDF.write .format(dataSourceName) From 3bb42bdcfe4ddbe439652884eaa8138e5b40f2c8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 2 May 2016 20:15:58 -0700 Subject: [PATCH 02/13] Addressed comments --- .../datasources/fileSourceInterfaces.scala | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 20490984f2e6..26411b7ad54e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -291,12 +291,8 @@ class HDFSFileCatalog( refresh() override def listFiles(filters: Seq[Expression]): Seq[Partition] = { - if (partitionSpec().partitionColumns.isEmpty) { - Partition( - InternalRow.empty, - unpartitionedDataFiles().filterNot(_.getPath.getName startsWith "_") - ) :: Nil + Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => @@ -341,9 +337,30 @@ class HDFSFileCatalog( } } + /** + * All the files to consider for processing. If there is a partitioning scheme, then + * consider all the leaf files in the input paths. Else consider only the input paths + * (if a path is file) or their immediate children (if a path is a directory). + */ def allFiles(): Seq[FileStatus] = { if (partitionSpec().partitionColumns.isEmpty) { - unpartitionedDataFiles() + // For each of the input paths, get the list of files inside them + paths.flatMap { path => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = path.getFileSystem(hadoopConf) + val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + + // There are three cases possible with each path + // 1. The path is a directory and has children files in it. Then it must be present in + // leafDirToChildrenFiles as those children files will have been found as leaf files. + // Find its children files from leafDirToChildrenFiles and include them. + // 2. The path is a file, then it will be present in leafFiles. Include this path. + // 3. The path is a directory, but has no children files. Do not include this path. + + leafDirToChildrenFiles.get(qualifiedPath) + .orElse { leafFiles.get(path).map(Array(_)) } + .getOrElse(Array.empty) + } } else { leafFiles.values.toSeq } @@ -453,22 +470,6 @@ class HDFSFileCatalog( } } - /** List of files to consider when there is not inferred partitioning scheme */ - private def unpartitionedDataFiles(): Seq[FileStatus] = { - // For each of the input paths, get the list of files inside them - paths.flatMap { path => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = path.getFileSystem(hadoopConf) - val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - - // If it is a directory (i.e. exists in leafDirToChildrenFiles), return its children files - // Or if it is a file (i.e. exists in leafFiles), return the path itself - leafDirToChildrenFiles.get(qualifiedPath).orElse { - leafFiles.get(path).map(Array(_)) - }.getOrElse(Array.empty) - } - } - def refresh(): Unit = { val files = listLeafFiles(paths) From 2f7c52393c137db633726f8bacb2dc552bf31c65 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 2 May 2016 23:43:14 -0700 Subject: [PATCH 03/13] Updated test for SimpleTextHadoopFsRelationSuite --- .../spark/sql/sources/HadoopFsRelationTest.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index cbf1d250357b..534e7ba76f48 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -516,10 +516,20 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(subdir) // Inferring schema should throw error as it should not find any file to infer - val e = intercept[AnalysisException] { + val e = intercept[Exception] { sqlContext.read.format(dataSourceName).load(dir) } - assert(e.getMessage.contains("infer")) + + e match { + case _: AnalysisException => + assert(e.getMessage.contains("infer")) + + case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") => + // ignore, the source format requires schema to be provided by user + + case _ => + fail("Unexpected error trying to infer schema from empty dir", e) + } /** Test whether data is read with the given path matches the expected answer */ def testWithPath(path: String, expectedAnswer: Seq[Row]): Unit = { From 4198d560e0debc93df7675fbddc64b35a4e3adc5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 01:50:39 -0700 Subject: [PATCH 04/13] Fix unit test --- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6b1ecd08c13c..23cc2636f5eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -210,8 +210,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("FileStreamSource schema: parquet, existing files, no schema") { withTempDir { src => - Seq("a", "b", "c").toDS().as("userColumn").toDF() - .write.parquet(new File(src, "1").getCanonicalPath) + Seq("a", "b", "c").toDS().as("userColumn").toDF().write + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .parquet(src.getCanonicalPath) val schema = createFileStreamSourceAndGetSchema( format = Some("parquet"), path = Some(src.getCanonicalPath), schema = None) assert(schema === new StructType().add("value", StringType)) From f1b793a0bd3580d83f8ae8ebf80954c887fa6917 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 3 May 2016 11:11:50 -0700 Subject: [PATCH 05/13] Fixed more unit tests --- .../spark/sql/streaming/FileStreamSourceSuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 23cc2636f5eb..b0bd4806b67e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.io.File +import java.util.UUID import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ @@ -84,10 +85,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { AddParquetFileData(seq.toDS().toDF(), src, tmp) } + /** Write parquet files in a temp dir, and move the individual files to the 'src' dir */ def writeToFile(df: DataFrame, src: File, tmp: File): Unit = { - val file = Utils.tempFileWith(new File(tmp, "parquet")) - df.write.parquet(file.getCanonicalPath) - file.renameTo(new File(src, file.getName)) + val tmpDir = Utils.tempFileWith(new File(tmp, "parquet")) + df.write.parquet(tmpDir.getCanonicalPath) + tmpDir.listFiles().foreach { f => + f.renameTo(new File(src, s"${f.getName}")) + } } } From efd261fccd26ebdf6f5e6ee4750387005c7ba79d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 4 May 2016 13:24:56 -0700 Subject: [PATCH 06/13] Addressed comments --- .../sql/sources/HadoopFsRelationTest.scala | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 534e7ba76f48..ef55c75c8967 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -489,9 +489,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("load() - with directory of unpartitioned data in nested subdirs") { - withTempPath { file => - val dir = file.getCanonicalPath - val subdir = new File(dir, "subdir").getCanonicalPath + withTempPath { dir => + val subdir = new File(dir, "subdir") val dataInDir = Seq(1, 2, 3).toDF("value") val dataInSubdir = Seq(4, 5, 6).toDF("value") @@ -503,21 +502,24 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes dir | |___ [ files of dataInDir ] - | - |___ subsubdir - | - |___ [ files of dataInSubdir ] + | + |___ subsubdir + | + |___ [ files of dataInSubdir ] */ // Generated dataInSubdir, not data in dir - partitionedTestDF1.write + dataInSubdir.write .format(dataSourceName) .mode(SaveMode.Overwrite) - .save(subdir) + .save(subdir.getCanonicalPath) + + require(subdir.exists) + require(subdir.listFiles().exists(!_.isDirectory)) // Inferring schema should throw error as it should not find any file to infer val e = intercept[Exception] { - sqlContext.read.format(dataSourceName).load(dir) + sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath) } e match { @@ -532,24 +534,25 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } /** Test whether data is read with the given path matches the expected answer */ - def testWithPath(path: String, expectedAnswer: Seq[Row]): Unit = { + def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = { val df = sqlContext.read .format(dataSourceName) .schema(dataInDir.schema) // avoid schema inference for any format - .load(path) + .load(path.getCanonicalPath) checkAnswer(df, expectedAnswer) } - // Reading by the path 'file/' *not by 'file/subdir') should give empty results - // as there are no files in 'file' and it should not pick up files in 'file/subdir' + // Verify that reading by path 'dir/' gives empty results as there are no files in 'file' + // and it should not pick up files in 'dir/subdir' testWithPath(dir, Seq.empty) + // Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir dataInDir.write .format(dataSourceName) - .mode(SaveMode.Overwrite) - .save(dir) - - // Should give only rows from partitionedTestDF2 + .mode(SaveMode.Ignore) + .save(dir.getCanonicalPath) + require(dir.listFiles().exists(!_.isDirectory)) + require(subdir.listFiles().exists(!_.isDirectory)) testWithPath(dir, dataInDir.collect()) } } @@ -558,10 +561,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempPath { file => val dir = file.getCanonicalPath - val subdir = new File(dir, "subdir").getCanonicalPath - val subsubdir = new File(subdir, "subsubdir").getCanonicalPath + val subdir = new File(dir, "subdir") + val subsubdir = new File(subdir, "subsubdir") val anotherSubsubdir = - new File(new File(dir, "another-subdir"), "another-subsubdir").getCanonicalPath + new File(new File(dir, "another-subdir"), "another-subsubdir") val dataInSubdir = Seq(1, 2, 3).toDF("value") val dataInSubsubdir = Seq(4, 5, 6).toDF("value") @@ -570,20 +573,26 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes dataInSubdir.write .format (dataSourceName) .mode (SaveMode.Overwrite) - .save (subdir) + .save (subdir.getCanonicalPath) dataInSubsubdir.write .format (dataSourceName) .mode (SaveMode.Overwrite) - .save (subsubdir) + .save (subsubdir.getCanonicalPath) dataInAnotherSubsubdir.write .format (dataSourceName) .mode (SaveMode.Overwrite) - .save (anotherSubsubdir) + .save (anotherSubsubdir.getCanonicalPath) - /* + require(subdir.exists) + require(subdir.listFiles().exists(!_.isDirectory)) + require(subsubdir.exists) + require(subsubdir.listFiles().exists(!_.isDirectory)) + require(anotherSubsubdir.exists) + require(anotherSubsubdir.listFiles().exists(!_.isDirectory)) + /* Directory structure generated dir @@ -620,7 +629,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testWithPath(s"$dir/another*/*", dataInAnotherSubsubdir) testWithPath(s"$dir/*/another*", dataInAnotherSubsubdir) testWithPath(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir)) - } } From f15ee3217b70da6282238faba2c6f97492b4116e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 4 May 2016 18:33:35 -0700 Subject: [PATCH 07/13] Fixed tests --- .../apache/spark/sql/sources/HadoopFsRelationTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index ef55c75c8967..e90621d527ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -514,9 +514,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .mode(SaveMode.Overwrite) .save(subdir.getCanonicalPath) - require(subdir.exists) - require(subdir.listFiles().exists(!_.isDirectory)) - // Inferring schema should throw error as it should not find any file to infer val e = intercept[Exception] { sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath) @@ -544,14 +541,17 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // Verify that reading by path 'dir/' gives empty results as there are no files in 'file' // and it should not pick up files in 'dir/subdir' + require(subdir.exists) + require(subdir.listFiles().exists(!_.isDirectory)) testWithPath(dir, Seq.empty) // Verify that if there is data in dir, then reading by path 'dir/' reads only dataInDir dataInDir.write .format(dataSourceName) - .mode(SaveMode.Ignore) + .mode(SaveMode.Append) // append to prevent subdir from being deleted .save(dir.getCanonicalPath) require(dir.listFiles().exists(!_.isDirectory)) + require(subdir.exists()) require(subdir.listFiles().exists(!_.isDirectory)) testWithPath(dir, dataInDir.collect()) } From 2b43ae4fb67bfc4d85e919b9a01d4ee9eeb6d142 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 May 2016 18:12:33 -0700 Subject: [PATCH 08/13] Addressed comments and added tests --- .../ParquetPartitionDiscoverySuite.scala | 47 +++++++++ .../sql/sources/HadoopFsRelationTest.scala | 98 +++++++++++++------ 2 files changed, 113 insertions(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index cb2c2522b20c..dd67d99682e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -765,6 +765,53 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } + test("use basePath and file globbing to selectively load partitioned table") { + withTempPath { dir => + + val df = Seq( + (1, "foo", 100), + (1, "bar", 200), + (2, "foo", 300), + (2, "bar", 400) + ).toDF("p1", "p2", "v") + df.write + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .parquet(dir.getCanonicalPath) + + def check(path: String, basePath: String, expectedDf: DataFrame): Unit = { + val testDf = sqlContext.read + .option("basePath", basePath) + .parquet(path) + checkAnswer(testDf, expectedDf) + } + + // Should find all the data with partitioning columns when base path is set to the root + val resultDf = df.select("v", "p1", "p2") + check(path = s"$dir", basePath = s"$dir", resultDf) + check(path = s"$dir/*", basePath = s"$dir", resultDf) + check(path = s"$dir/*/*", basePath = s"$dir", resultDf) + check(path = s"$dir/*/*/*", basePath = s"$dir", resultDf) + + // Should find selective partitions of the data if the base path is not set to root + + check( // read from ../p1=1 with base ../p1=1, should not infer p1 col + path = s"$dir/p1=1/*", + basePath = s"$dir/p1=1/", + resultDf.filter("p1 = 1").drop("p1")) + + check( // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1 + path = s"$dir/p1=1/p2=foo/*", + basePath = s"$dir/p1=1/", + resultDf.filter("p1 = 1").filter("p2 = foo").drop("p1")) + + check( // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer p1, p2 + path = s"$dir/p1=1/p2=foo/*", + basePath = s"$dir/p1=1/p2=foo/", + resultDf.filter("p1 = 1").filter("p2 = foo").drop("p1", "p2")) + } + } + test("_SUCCESS should not break partitioning discovery") { Seq(1, 32).foreach { threshold => // We have two paths to list files, one at driver side, another one that we use diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index e90621d527ad..5a41f9010da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -524,7 +524,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes assert(e.getMessage.contains("infer")) case _: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") => - // ignore, the source format requires schema to be provided by user + // Ignore error, the source format requires schema to be provided by user + // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema case _ => fail("Unexpected error trying to infer schema from empty dir", e) @@ -615,8 +616,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val schema = dataInSubdir.schema - /** Test whether data is read with the given path matches the expected answer */ - def testWithPath(path: String, expectedDf: DataFrame): Unit = { + /** Check whether data is read with the given path matches the expected answer */ + def check(path: String, expectedDf: DataFrame): Unit = { val df = sqlContext.read .format(dataSourceName) .schema(schema) // avoid schema inference for any format @@ -624,48 +625,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes checkAnswer(df, expectedDf) } - testWithPath(s"$dir/*/", dataInSubdir) - testWithPath(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir)) - testWithPath(s"$dir/another*/*", dataInAnotherSubsubdir) - testWithPath(s"$dir/*/another*", dataInAnotherSubsubdir) - testWithPath(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir)) + check(s"$dir/*/", dataInSubdir) + check(s"$dir/sub*/*", dataInSubdir.union(dataInSubsubdir)) + check(s"$dir/another*/*", dataInAnotherSubsubdir) + check(s"$dir/*/another*", dataInAnotherSubsubdir) + check(s"$dir/*/*", dataInSubdir.union(dataInSubsubdir).union(dataInAnotherSubsubdir)) } } test("Hadoop style globbing - partitioned data") { - withTempPath { file => + + // Tests the following on partition data + // - partitions are not discovered with globbing and without base path set. + // - partitions are discovered with globbing and base path set, though more detailed + // tests for this is in ParquetPartitionDiscoverySuite + + withTempPath { path => + val dir = path.getCanonicalPath partitionedTestDF.write .format(dataSourceName) .mode(SaveMode.Overwrite) .partitionBy("p1", "p2") - .save(file.getCanonicalPath) + .save(dir) + + def check( + path: String, + expectedResult: Either[DataFrame, String], + basePath: Option[String] = None + ): Unit = { + try { + val reader = sqlContext.read + basePath.foreach(reader.option("basePath", _)) + val testDf = reader + .format(dataSourceName) + .load(path) + assert(expectedResult.isLeft, s"Error was expected with $path but result found") + checkAnswer(testDf, expectedResult.left.get) + } catch { + case e: Throwable => + assert(expectedResult.isRight, s"Was not expecting error with $path: " + e) + assert( + e.getMessage.contains(expectedResult.right.get), + s"Did not find expected error message wiht $path") + } + } - val df = sqlContext.read - .format(dataSourceName) - .option("dataSchema", dataSchema.json) - .option("basePath", file.getCanonicalPath) - .load(s"${file.getCanonicalPath}/p1=*/p2=???") - - val expectedPaths = Set( - s"${file.getCanonicalFile}/p1=1/p2=foo", - s"${file.getCanonicalFile}/p1=2/p2=foo", - s"${file.getCanonicalFile}/p1=1/p2=bar", - s"${file.getCanonicalFile}/p1=2/p2=bar" - ).map { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf()) - path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + object Error { + def apply(msg: String): Either[DataFrame, String] = Right(msg) } - val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: HadoopFsRelation, _, _) => - relation.location.paths.map(_.toString).toSet - }.getOrElse { - fail("Expect an FSBasedRelation, but none could be found") + object Result { + def apply(df: DataFrame): Either[DataFrame, String] = Left(df) } - assert(actualPaths === expectedPaths) - checkAnswer(df, partitionedTestDF.collect()) + // ---- Without base path set ---- + // Should find all the data with partitioning columns + check(s"$dir", Result(partitionedTestDF)) + + // Should fail as globbing finds dirs without files, only subdirs in them. + check(s"$dir/*/", Error("please set \"basePath\"")) + check(s"$dir/p1=*/", Error("please set \"basePath\"")) + + // Should not find partition columns as the globs resolve to p2 dirs + // with files in them + check(s"$dir/*/*", Result(partitionedTestDF.drop("p1", "p2"))) + check(s"$dir/p1=*/p2=???", Result(partitionedTestDF.drop("p1", "p2"))) + + // Should find all data without the partitioning columns as the globs resolve to the files + check(s"$dir/*/*/*", Result(partitionedTestDF.drop("p1", "p2"))) + check(s"$dir/p1=*/p2=*/*.parquet", Result(partitionedTestDF.drop("p1", "p2"))) + + // ---- With base path set ---- + val resultDf = partitionedTestDF.select("a", "b", "p1", "p2") + check(path = s"$dir/*", Result(resultDf), basePath = Some(dir)) + check(path = s"$dir/*/*", Result(resultDf), basePath = Some(dir)) + check(path = s"$dir/*/*/*", Result(resultDf), basePath = Some(dir)) } } From 9a64496f9cdea03f839504da1807deed9b123b2a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 May 2016 18:13:23 -0700 Subject: [PATCH 09/13] Add new missing file --- .../datasources/FileCatalogSuite.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala new file mode 100644 index 000000000000..f02b9992f194 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.test.SharedSQLContext + +class FileCatalogSuite extends SharedSQLContext { + + test("ListingFileCatalog: leaf files are qualified paths") { + + withTempDir { dir => + val file = new File(dir, "text.txt") + stringToFile(file, "text") + + val path = new Path(file.getCanonicalPath) + val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) { + def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq + def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq + } + assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/"))) + assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/"))) + } + } +} From 8c06d4ea4516e1760958b08f6e6b77d1811dfd46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 5 May 2016 19:09:45 -0700 Subject: [PATCH 10/13] Fixed bug --- .../spark/sql/sources/HadoopFsRelationTest.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 5a41f9010da0..20c5f72ff1ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -620,7 +620,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes def check(path: String, expectedDf: DataFrame): Unit = { val df = sqlContext.read .format(dataSourceName) - .schema(schema) // avoid schema inference for any format + .schema(schema) // avoid schema inference for any format, expected to be same format .load(path) checkAnswer(df, expectedDf) } @@ -633,7 +633,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - test("Hadoop style globbing - partitioned data") { + test("Hadoop style globbing - partitioned data with schema inference") { // Tests the following on partition data // - partitions are not discovered with globbing and without base path set. @@ -662,6 +662,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes assert(expectedResult.isLeft, s"Error was expected with $path but result found") checkAnswer(testDf, expectedResult.left.get) } catch { + case e: java.util.NoSuchElementException if e.getMessage.contains("dataSchema") => + // Ignore error, the source format requires schema to be provided by user + // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema + case e: Throwable => assert(expectedResult.isRight, s"Was not expecting error with $path: " + e) assert( @@ -689,11 +693,11 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // Should not find partition columns as the globs resolve to p2 dirs // with files in them check(s"$dir/*/*", Result(partitionedTestDF.drop("p1", "p2"))) - check(s"$dir/p1=*/p2=???", Result(partitionedTestDF.drop("p1", "p2"))) + check(s"$dir/p1=*/p2=foo", Result(partitionedTestDF.filter("p2 = 'foo'").drop("p1", "p2"))) + check(s"$dir/p1=1/p2=???", Result(partitionedTestDF.filter("p1 = 1").drop("p1", "p2"))) // Should find all data without the partitioning columns as the globs resolve to the files check(s"$dir/*/*/*", Result(partitionedTestDF.drop("p1", "p2"))) - check(s"$dir/p1=*/p2=*/*.parquet", Result(partitionedTestDF.drop("p1", "p2"))) // ---- With base path set ---- val resultDf = partitionedTestDF.select("a", "b", "p1", "p2") From 7c5d7ba0d81a7a617fee80a7a19a2fb08f6d9bbf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 6 May 2016 02:45:41 -0700 Subject: [PATCH 11/13] Fixed bugs --- .../PartitioningAwareFileCatalog.scala | 6 ++--- .../datasources/FileCatalogSuite.scala | 22 ++++++++++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 00e64e9083e7..5f04a6c60df9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -67,7 +67,7 @@ abstract class PartitioningAwareFileCatalog( paths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) - val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + val qualifiedPath = fs.makeQualified(path) // There are three cases possible with each path // 1. The path is a directory and has children files in it. Then it must be present in @@ -77,9 +77,7 @@ abstract class PartitioningAwareFileCatalog( // 3. The path is a directory, but has no children files. Do not include this path. leafDirToChildrenFiles.get(qualifiedPath) - .orElse { - leafFiles.get(path).map(Array(_)) - } + .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } .getOrElse(Array.empty) } } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index f02b9992f194..86ac038f7073 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.test.SharedSQLContext class FileCatalogSuite extends SharedSQLContext { test("ListingFileCatalog: leaf files are qualified paths") { - withTempDir { dir => val file = new File(dir, "text.txt") stringToFile(file, "text") @@ -41,4 +40,25 @@ class FileCatalogSuite extends SharedSQLContext { assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/"))) } } + + test("ListingFileCatalog: input paths are converted to qualified paths") { + withTempDir { dir => + val file = new File(dir, "text.txt") + stringToFile(file, "text") + + val unqualifiedDirPath = new Path(dir.getCanonicalPath) + val unqualifiedFilePath = new Path(file.getCanonicalPath) + val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration) + val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath)) + + val catalog1 = new ListingFileCatalog( + sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None) + assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) + + val catalog2 = new ListingFileCatalog( + sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None) + assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) + + } + } } From 33a1345952410713d1cf00e974028bd00267536d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 6 May 2016 11:21:35 -0700 Subject: [PATCH 12/13] Fixed ParquetPartitionDiscoverySuite --- .../spark/sql/execution/datasources/FileCatalogSuite.scala | 6 +++++- .../parquet/ParquetPartitionDiscoverySuite.scala | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 86ac038f7073..79d7ec58c763 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -48,15 +48,19 @@ class FileCatalogSuite extends SharedSQLContext { val unqualifiedDirPath = new Path(dir.getCanonicalPath) val unqualifiedFilePath = new Path(file.getCanonicalPath) + require(!unqualifiedDirPath.toString.contains("file:")) + require(!unqualifiedFilePath.toString.contains("file:")) + val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration) val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath)) + require(qualifiedFilePath.toString.startsWith("file:")) val catalog1 = new ListingFileCatalog( sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None) assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) val catalog2 = new ListingFileCatalog( - sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None) + sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None) assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index dd67d99682e3..b4d35be05d13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -803,12 +803,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha check( // red from ../p1=1/p2=foo with base ../p1=1/ should not infer p1 path = s"$dir/p1=1/p2=foo/*", basePath = s"$dir/p1=1/", - resultDf.filter("p1 = 1").filter("p2 = foo").drop("p1")) + resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1")) check( // red from ../p1=1/p2=foo with base ../p1=1/p2=foo, should not infer p1, p2 path = s"$dir/p1=1/p2=foo/*", basePath = s"$dir/p1=1/p2=foo/", - resultDf.filter("p1 = 1").filter("p2 = foo").drop("p1", "p2")) + resultDf.filter("p1 = 1").filter("p2 = 'foo'").drop("p1", "p2")) } } From 8abc99901f864d142b76386981e7bcdbd2be0b64 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 6 May 2016 13:24:28 -0700 Subject: [PATCH 13/13] One more fix --- .../spark/sql/execution/datasources/FileCatalogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 79d7ec58c763..dab5c76200f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -60,7 +60,7 @@ class FileCatalogSuite extends SharedSQLContext { assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) val catalog2 = new ListingFileCatalog( - sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None) + sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None) assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) }