From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 1/7] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdba..5a5b71e52dd79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da02..5e00546a74c00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 2/7] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00aa..47962ebe6ef82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c00..61d9dcd37572b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 3/7] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572b..3427152b2da02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 461441c8acdfce8e175f34ba44b0d08dea761225 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 1 May 2016 19:29:49 -0700 Subject: [PATCH 4/7] initial fix. --- .../datasources/PartitioningUtils.scala | 4 +- .../ParquetPartitionDiscoverySuite.scala | 49 ++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 74f2993754f8f..31ad4264fa3c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -184,8 +184,10 @@ private[sql] object PartitioningUtils { return (None, None) } - if (basePaths.contains(currentPath)) { + if (basePaths.contains(currentPath) || + basePaths.exists(_.toString.startsWith(currentPath.toString))) { // If the currentPath is one of base paths. We should stop. + // If the currentPath is the basis of one of base paths. We should stop. finished = true } else { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 5bffb307ec80e..1fdc3fd22532d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -191,20 +191,39 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha checkThrows[AssertionError]("file://path/a=", "Empty partition column value") } + test("parse partition with base paths") { + val partitionSpec: Option[PartitionValues] = parsePartition( + path = new Path("file://path/a=10"), + defaultPartitionName = defaultPartitionName, + typeInference = true, + basePaths = Set(new Path("file://path/a=10/p.parquet")))._1 + + assert(partitionSpec.isEmpty) + } + test("parse partitions") { def check( paths: Seq[String], spec: PartitionSpec, - rootPaths: Set[Path] = Set.empty[Path]): Unit = { + rootPaths: Set[String] = Set.empty[String]): Unit = { val actualSpec = parsePartitions( paths.map(new Path(_)), defaultPartitionName, true, - rootPaths) + rootPaths.map(new Path(_))) assert(actualSpec === spec) } + check(Seq( + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/_temporary", + "hdfs://host:9000/path/a=10.5/b=hello"), + PartitionSpec.emptySpec, + Set( + "hdfs://host:9000/path/a=10/b=20/a.parquet", + "hdfs://host:9000/path/a=10.5/b=hello/p.parquet")) + check(Seq( "hdfs://host:9000/path/a=10/b=hello"), PartitionSpec( @@ -413,6 +432,32 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } + test("read partitioned table using different path options") { + withTempDir { base => + val pi = 1 + val ps = "foo" + val path = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps) + makeParquetFile( + (1 to 10).map(i => ParquetData(i, i.toString)), path) + + // when the input is the base path containing partitioned directories + val baseDf = sqlContext.read.parquet(base.getCanonicalPath) + assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps")) + + // when the input is a path to a partitioned directory containing a parquet file + val partDf = sqlContext.read.parquet(path.getCanonicalPath) + assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) + + path.listFiles().foreach { f => + if (f.getName.toLowerCase().endsWith(".parquet")) { + // when the input is a path to a parquet file + val df = sqlContext.read.parquet(f.getCanonicalPath) + assert(df.schema.map(_.name) === Seq("intField", "stringField")) + } + } + } + } + test("read partitioned table - partition key included in Parquet file") { withTempDir { base => for { From b230e33e32bd7d3ee6a89247733ae1915fffcceb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 2 May 2016 22:30:02 -0700 Subject: [PATCH 5/7] address comments --- .../datasources/PartitioningUtils.scala | 7 ++- .../datasources/fileSourceInterfaces.scala | 39 +++++++++------ .../ParquetPartitionDiscoverySuite.scala | 47 ++++++++++++------- 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 31ad4264fa3c3..e6b13d91f3c96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -95,7 +95,8 @@ private[sql] object PartitioningUtils { // We create pairs of (path -> path's partition value) here // If the corresponding partition value is None, the pair will be skipped - val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) + val pathsWithPartitionValues = + paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. @@ -184,10 +185,8 @@ private[sql] object PartitioningUtils { return (None, None) } - if (basePaths.contains(currentPath) || - basePaths.exists(_.toString.startsWith(currentPath.toString))) { + if (basePaths.contains(currentPath)) { // If the currentPath is one of base paths. We should stop. - // If the currentPath is the basis of one of base paths. We should stop. finished = true } else { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 25f88d9c39487..91b9ec9ceb622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -423,23 +423,34 @@ class HDFSFileCatalog( /** * Contains a set of paths that are considered as the base dirs of the input datasets. * The partitioning discovery logic will make sure it will stop when it reaches any - * base path. By default, the paths of the dataset provided by users will be base paths. - * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path - * will be `/path/something=true/`, and the returned DataFrame will not contain a column of - * `something`. If users want to override the basePath. They can set `basePath` in the options - * to pass the new base path to the data source. - * For the above example, if the user-provided base path is `/path/`, the returned + * base path. + * + * By default, the paths of the dataset provided by users will be base paths. + * Below are three typical examples, + * Case 1) `sqlContext.read.parquet("/path/something=true/")`: the base path will be + * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`. + * Case 2) `sqlContext.read.parquet("/path/something=true/a.parquet")`: the base path will be + * still `/path/something=true/`, and the returned DataFrame will also not contain a column of + * `something`. + * Case 3) `sqlContext.read.parquet("/path/")`: the base path will be `/path/`, and the returned * DataFrame will have the column of `something`. + * + * Users also can override the basePath by setting `basePath` in the options to pass the new base + * path to the data source. + * For example, `sqlContext.read.option("basePath", "/path/").parquet("/path/something=true/")`, + * and the returned DataFrame will have the column of `something`. */ private def basePaths: Set[Path] = { - val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) - userDefinedBasePath.getOrElse { - // If the user does not provide basePath, we will just use paths. - paths.toSet - }.map { hdfsPath => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = hdfsPath.getFileSystem(hadoopConf) - hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + parameters.get("basePath").map(new Path(_)) match { + case Some(userDefinedBasePath) => + val fs = userDefinedBasePath.getFileSystem(hadoopConf) + if (!fs.isDirectory(userDefinedBasePath)) { + throw new IllegalArgumentException("Option 'basePath' must be a directory") + } + Set(userDefinedBasePath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + + case None => + paths.map { path => if (leafFiles.contains(path)) path.getParent else path }.toSet } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 1fdc3fd22532d..cb2c2522b20cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -192,38 +192,42 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } test("parse partition with base paths") { - val partitionSpec: Option[PartitionValues] = parsePartition( + // when the basePaths is the same as the path to a leaf directory + val partitionSpec1: Option[PartitionValues] = parsePartition( path = new Path("file://path/a=10"), defaultPartitionName = defaultPartitionName, typeInference = true, - basePaths = Set(new Path("file://path/a=10/p.parquet")))._1 + basePaths = Set(new Path("file://path/a=10")))._1 - assert(partitionSpec.isEmpty) + assert(partitionSpec1.isEmpty) + + // when the basePaths is the path to a base directory of leaf directories + val partitionSpec2: Option[PartitionValues] = parsePartition( + path = new Path("file://path/a=10"), + defaultPartitionName = defaultPartitionName, + typeInference = true, + basePaths = Set(new Path("file://path")))._1 + + assert(partitionSpec2 == + Option(PartitionValues( + ArrayBuffer("a"), + ArrayBuffer(Literal.create(10, IntegerType))))) } test("parse partitions") { def check( paths: Seq[String], spec: PartitionSpec, - rootPaths: Set[String] = Set.empty[String]): Unit = { + rootPaths: Set[Path] = Set.empty[Path]): Unit = { val actualSpec = parsePartitions( paths.map(new Path(_)), defaultPartitionName, true, - rootPaths.map(new Path(_))) + rootPaths) assert(actualSpec === spec) } - check(Seq( - "hdfs://host:9000/path/a=10/b=20", - "hdfs://host:9000/path/a=10.5/_temporary", - "hdfs://host:9000/path/a=10.5/b=hello"), - PartitionSpec.emptySpec, - Set( - "hdfs://host:9000/path/a=10/b=20/a.parquet", - "hdfs://host:9000/path/a=10.5/b=hello/p.parquet")) - check(Seq( "hdfs://host:9000/path/a=10/b=hello"), PartitionSpec( @@ -440,11 +444,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha makeParquetFile( (1 to 10).map(i => ParquetData(i, i.toString)), path) - // when the input is the base path containing partitioned directories + // when the input is the base path containing partitioning directories val baseDf = sqlContext.read.parquet(base.getCanonicalPath) assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps")) - // when the input is a path to a partitioned directory containing a parquet file + // when the input is a path to the leaf directory containing a parquet file val partDf = sqlContext.read.parquet(path.getCanonicalPath) assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) @@ -455,6 +459,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha assert(df.schema.map(_.name) === Seq("intField", "stringField")) } } + + path.listFiles().foreach { f => + if (f.getName.toLowerCase().endsWith(".parquet")) { + // when the input is a path to a parquet file but `basePath` is overridden to + // the base path containing partitioning directories + val df = sqlContext + .read.option("basePath", base.getCanonicalPath) + .parquet(f.getCanonicalPath) + assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps")) + } + } } } From bf98150cd1c36368d38d934a6590da4419ab9fae Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 2 May 2016 22:30:42 -0700 Subject: [PATCH 6/7] revert --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index e6b13d91f3c96..74f2993754f8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -95,8 +95,7 @@ private[sql] object PartitioningUtils { // We create pairs of (path -> path's partition value) here // If the corresponding partition value is None, the pair will be skipped - val pathsWithPartitionValues = - paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) + val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. From 252065cb0afa1c624274e5f589a38d8614a3d91f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 3 May 2016 21:13:30 -0700 Subject: [PATCH 7/7] address comments. --- .../sql/execution/datasources/fileSourceInterfaces.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 78389000d8464..a9f16a2b8cb44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -475,10 +475,13 @@ class HDFSFileCatalog( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException("Option 'basePath' must be a directory") } - Set(userDefinedBasePath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + Set(fs.makeQualified(userDefinedBasePath)) case None => - paths.map { path => if (leafFiles.contains(path)) path.getParent else path }.toSet + paths.map { path => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path) + if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet } }