From b0e6ce2729f584a9f95996707f60eb650c2a58b9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 6 Nov 2015 16:38:26 +0900 Subject: [PATCH 1/5] [SPARK-11500][SQL] Not deterministic order of columns when using merging schemas. --- .../datasources/parquet/ParquetRelation.scala | 26 +++++++++++++++---- .../apache/spark/sql/sources/interfaces.scala | 22 +++++++++------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 77d851ca486b..052ed0654ecf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -379,7 +379,7 @@ private[sql] class ParquetRelation( var schema: StructType = _ // Cached leaves - var cachedLeaves: Set[FileStatus] = null + var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null /** * Refreshes `FileStatus`es, footers, partition spec, and table schema. @@ -392,7 +392,7 @@ private[sql] class ParquetRelation( !cachedLeaves.equals(currentLeafStatuses) if (leafStatusesChanged) { - cachedLeaves = currentLeafStatuses.toIterator.toSet + cachedLeaves = currentLeafStatuses // Lists `FileStatus`es of all leaf nodes (files) under all base directories. val leaves = currentLeafStatuses.filter { f => @@ -461,13 +461,29 @@ private[sql] class ParquetRelation( // You should enable this configuration only if you are very sure that for the parquet // part-files to read there are corresponding summary files containing correct schema. + // As filed in SPARK-11500, the order of files to touch is a matter, which might affect + // the ordering of the output columns. There are several things to mention here. + // + // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from + // the first part-file so that the columns of the first file show first. + // + // 2. If mergeRespectSummaries config is true, then there should be, at least, + // "_metadata"s for all given files. So, we can ensure the columns of the first file + // show first. + // + // 3. If shouldMergeSchemas is false, but when multiple files are given, there is + // no guarantee of the output order, since there might not be a summary file for the + // first file, which ends up putting ahead the columns of the other files. However, + // this should be okay since not enabling shouldMergeSchemas means (assumes) all the + // files have the same schemas. + val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { Seq() } else { dataStatuses } - (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq + needMerged ++ metadataStatuses ++ commonMetadataStatuses } else { // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet // don't have this. @@ -764,10 +780,10 @@ private[sql] object ParquetRelation extends Logging { footers.map { footer => ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceOption(_ merge _).iterator + }.reduceLeftOption(_ merge _).iterator }.collect() - partiallyMergedSchemas.reduceOption(_ merge _) + partiallyMergedSchemas.reduceLeftOption(_ merge _) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 7b030b7d73bd..6d5cd5a4adb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -419,11 +419,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private var _partitionSpec: PartitionSpec = _ private class FileStatusCache { - var leafFiles = mutable.Map.empty[Path, FileStatus] + var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - private def listLeafFiles(paths: Array[String]): Set[FileStatus] = { + private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) } else { @@ -441,10 +441,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val (dirs, files) = statuses.partition(_.isDir) + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) if (dirs.isEmpty) { - files.toSet + mutable.LinkedHashSet(files: _*) } else { - files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString)) + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString)) } } } @@ -455,7 +456,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio leafFiles.clear() leafDirToChildrenFiles.clear() - leafFiles ++= files.map(f => f.getPath -> f).toMap + leafFiles ++= files.map(f => f.getPath -> f) leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) } } @@ -466,8 +467,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio cache } - protected def cachedLeafStatuses(): Set[FileStatus] = { - fileStatusCache.leafFiles.values.toSet + protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = { + mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*) } final private[sql] def partitionSpec: PartitionSpec = { @@ -766,7 +767,7 @@ private[sql] object HadoopFsRelation extends Logging { def listLeafFilesInParallel( paths: Array[String], hadoopConf: Configuration, - sparkContext: SparkContext): Set[FileStatus] = { + sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = { logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") val serializableConfiguration = new SerializableConfiguration(hadoopConf) @@ -786,9 +787,10 @@ private[sql] object HadoopFsRelation extends Logging { status.getAccessTime) }.collect() - fakeStatuses.map { f => + val fakeStatusesSeq = fakeStatuses.map { f => new FileStatus( f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)) - }.toSet + } + mutable.LinkedHashSet(fakeStatusesSeq: _*) } } From 08fc91ca8d21902677e78f0adb3b36769f2cba51 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 6 Nov 2015 16:38:55 +0900 Subject: [PATCH 2/5] [SPARK-11500][SQL] Add a test to check the deterministic order. --- .../ParquetHadoopFsRelationSuite.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index e2d754e80640..036be73575e1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -23,7 +23,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.{execution, AnalysisException, SaveMode} +import org.apache.spark.sql._ import org.apache.spark.sql.types._ @@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) } } + + test("SPARK-11500: Not deterministic order of columns when using merging schemas.") { + import testImplicits._ + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { + withTempPath { dir => + val pathOne = s"${dir.getCanonicalPath}/table1" + Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne) + val pathTwo = s"${dir.getCanonicalPath}/table2" + Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo) + val pathThree = s"${dir.getCanonicalPath}/table3" + Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree) + + // Here the columns shows according to the order of given files. + assert(sqlContext.read.parquet(pathOne, pathTwo, pathThree).schema.map(_.name) + === Seq("a", "b", "c", "d")) + } + } + } } + From bcf72d3ca308f9a69993803d9c8939696c915b07 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 6 Nov 2015 16:40:17 +0900 Subject: [PATCH 3/5] [SPARK-11500][SQL] Remove trailing newline. --- .../apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 036be73575e1..7628d1466120 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -174,4 +174,3 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } } } - From 4f4706352c84469503ae3c3388098458b570f62f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 9 Nov 2015 19:06:01 +0900 Subject: [PATCH 4/5] [SPARK-11500][SQL] Sort file statuses, partitioned tables for the test and rename variable. --- .../datasources/parquet/ParquetRelation.scala | 15 ++++++++------- .../org/apache/spark/sql/sources/interfaces.scala | 4 ++-- .../sources/ParquetHadoopFsRelationSuite.scala | 13 +++++++------ 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 052ed0654ecf..e3a50ba8a7fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -398,7 +398,7 @@ private[sql] class ParquetRelation( val leaves = currentLeafStatuses.filter { f => isSummaryFile(f.getPath) || !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray + }.toArray.sortWith(_.getPath.toString < _.getPath.toString) dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) metadataStatuses = @@ -465,17 +465,18 @@ private[sql] class ParquetRelation( // the ordering of the output columns. There are several things to mention here. // // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from - // the first part-file so that the columns of the first file show first. + // the first part-file so that the columns of the lexicographically first file show + // first. // // 2. If mergeRespectSummaries config is true, then there should be, at least, - // "_metadata"s for all given files. So, we can ensure the columns of the first file - // show first. + // "_metadata"s for all given files, so that we can ensure the columns of + // the lexicographically first file show first. // // 3. If shouldMergeSchemas is false, but when multiple files are given, there is // no guarantee of the output order, since there might not be a summary file for the - // first file, which ends up putting ahead the columns of the other files. However, - // this should be okay since not enabling shouldMergeSchemas means (assumes) all the - // files have the same schemas. + // lexicographically first file, which ends up putting ahead the columns of + // the other files. However, this should be okay since not enabling + // shouldMergeSchemas means (assumes) all the files have the same schemas. val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6d5cd5a4adb0..76fd76495f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -787,10 +787,10 @@ private[sql] object HadoopFsRelation extends Logging { status.getAccessTime) }.collect() - val fakeStatusesSeq = fakeStatuses.map { f => + val hadoopFakeStatuses = fakeStatuses.map { f => new FileStatus( f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)) } - mutable.LinkedHashSet(fakeStatusesSeq: _*) + mutable.LinkedHashSet(hadoopFakeStatuses: _*) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 7628d1466120..e866493ee6c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -160,16 +160,17 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { import testImplicits._ withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { withTempPath { dir => - val pathOne = s"${dir.getCanonicalPath}/table1" + val pathOne = s"${dir.getCanonicalPath}/part=1" Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne) - val pathTwo = s"${dir.getCanonicalPath}/table2" + val pathTwo = s"${dir.getCanonicalPath}/part=2" Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo) - val pathThree = s"${dir.getCanonicalPath}/table3" + val pathThree = s"${dir.getCanonicalPath}/part=3" Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree) - // Here the columns shows according to the order of given files. - assert(sqlContext.read.parquet(pathOne, pathTwo, pathThree).schema.map(_.name) - === Seq("a", "b", "c", "d")) + // The schema consists of the leading columns of the first part-file + // in the lexicographic order. + assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name) + === Seq("a", "b", "c", "d", "part")) } } } From 32dfb87ce36a093c54d4a3dfd39ccbc00c417af9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 10 Nov 2015 13:10:13 +0900 Subject: [PATCH 5/5] [SPARK-11500][SQL] Use sortBy instead of sortWith. --- .../sql/execution/datasources/parquet/ParquetRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index e3a50ba8a7fb..abcbc1fbed56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -398,7 +398,7 @@ private[sql] class ParquetRelation( val leaves = currentLeafStatuses.filter { f => isSummaryFile(f.getPath) || !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray.sortWith(_.getPath.toString < _.getPath.toString) + }.toArray.sortBy(_.getPath.toString) dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) metadataStatuses =