Skip to content

Commit 1bc4112

Browse files
HyukjinKwonliancheng
authored andcommitted
[SPARK-11500][SQL] Not deterministic order of columns when using merging schemas.
https://issues.apache.org/jira/browse/SPARK-11500 As filed in SPARK-11500, if merging schemas is enabled, the order of files to touch is a matter which might affect the ordering of the output columns. This was mostly because of the use of `Set` and `Map` so I replaced them to `LinkedHashSet` and `LinkedHashMap` to keep the insertion order. Also, I changed `reduceOption` to `reduceLeftOption`, and replaced the order of `filesToTouch` from `metadataStatuses ++ commonMetadataStatuses ++ needMerged` to `needMerged ++ metadataStatuses ++ commonMetadataStatuses` in order to touch the part-files first which always have the schema in footers whereas the others might not exist. One nit is, If merging schemas is not enabled, but when multiple files are given, there is no guarantee of the output order, since there might not be a summary file for the first file, which ends up putting ahead the columns of the other files. However, I thought this should be okay since disabling merging schemas means (assumes) all the files have the same schemas. In addition, in the test code for this, I only checked the names of fields. Author: hyukjinkwon <[email protected]> Closes #9517 from HyukjinKwon/SPARK-11500.
1 parent 99f5f98 commit 1bc4112

File tree

3 files changed

+55
-17
lines changed

3 files changed

+55
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ private[sql] class ParquetRelation(
383383
var schema: StructType = _
384384

385385
// Cached leaves
386-
var cachedLeaves: Set[FileStatus] = null
386+
var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null
387387

388388
/**
389389
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
@@ -396,13 +396,13 @@ private[sql] class ParquetRelation(
396396
!cachedLeaves.equals(currentLeafStatuses)
397397

398398
if (leafStatusesChanged) {
399-
cachedLeaves = currentLeafStatuses.toIterator.toSet
399+
cachedLeaves = currentLeafStatuses
400400

401401
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
402402
val leaves = currentLeafStatuses.filter { f =>
403403
isSummaryFile(f.getPath) ||
404404
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
405-
}.toArray
405+
}.toArray.sortBy(_.getPath.toString)
406406

407407
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
408408
metadataStatuses =
@@ -465,13 +465,30 @@ private[sql] class ParquetRelation(
465465
// You should enable this configuration only if you are very sure that for the parquet
466466
// part-files to read there are corresponding summary files containing correct schema.
467467

468+
// As filed in SPARK-11500, the order of files to touch is a matter, which might affect
469+
// the ordering of the output columns. There are several things to mention here.
470+
//
471+
// 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
472+
// the first part-file so that the columns of the lexicographically first file show
473+
// first.
474+
//
475+
// 2. If mergeRespectSummaries config is true, then there should be, at least,
476+
// "_metadata"s for all given files, so that we can ensure the columns of
477+
// the lexicographically first file show first.
478+
//
479+
// 3. If shouldMergeSchemas is false, but when multiple files are given, there is
480+
// no guarantee of the output order, since there might not be a summary file for the
481+
// lexicographically first file, which ends up putting ahead the columns of
482+
// the other files. However, this should be okay since not enabling
483+
// shouldMergeSchemas means (assumes) all the files have the same schemas.
484+
468485
val needMerged: Seq[FileStatus] =
469486
if (mergeRespectSummaries) {
470487
Seq()
471488
} else {
472489
dataStatuses
473490
}
474-
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
491+
needMerged ++ metadataStatuses ++ commonMetadataStatuses
475492
} else {
476493
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
477494
// don't have this.
@@ -768,10 +785,10 @@ private[sql] object ParquetRelation extends Logging {
768785

769786
footers.map { footer =>
770787
ParquetRelation.readSchemaFromFooter(footer, converter)
771-
}.reduceOption(_ merge _).iterator
788+
}.reduceLeftOption(_ merge _).iterator
772789
}.collect()
773790

774-
partiallyMergedSchemas.reduceOption(_ merge _)
791+
partiallyMergedSchemas.reduceLeftOption(_ merge _)
775792
}
776793

777794
/**

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
428428
private var _partitionSpec: PartitionSpec = _
429429

430430
private class FileStatusCache {
431-
var leafFiles = mutable.Map.empty[Path, FileStatus]
431+
var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
432432

433433
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
434434

435-
private def listLeafFiles(paths: Array[String]): Set[FileStatus] = {
435+
private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = {
436436
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
437437
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
438438
} else {
@@ -450,10 +450,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
450450

451451
val (dirs, files) = statuses.partition(_.isDir)
452452

453+
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
453454
if (dirs.isEmpty) {
454-
files.toSet
455+
mutable.LinkedHashSet(files: _*)
455456
} else {
456-
files.toSet ++ listLeafFiles(dirs.map(_.getPath.toString))
457+
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString))
457458
}
458459
}
459460
}
@@ -464,7 +465,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
464465
leafFiles.clear()
465466
leafDirToChildrenFiles.clear()
466467

467-
leafFiles ++= files.map(f => f.getPath -> f).toMap
468+
leafFiles ++= files.map(f => f.getPath -> f)
468469
leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
469470
}
470471
}
@@ -475,8 +476,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
475476
cache
476477
}
477478

478-
protected def cachedLeafStatuses(): Set[FileStatus] = {
479-
fileStatusCache.leafFiles.values.toSet
479+
protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = {
480+
mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*)
480481
}
481482

482483
final private[sql] def partitionSpec: PartitionSpec = {
@@ -834,7 +835,7 @@ private[sql] object HadoopFsRelation extends Logging {
834835
def listLeafFilesInParallel(
835836
paths: Array[String],
836837
hadoopConf: Configuration,
837-
sparkContext: SparkContext): Set[FileStatus] = {
838+
sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
838839
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
839840

840841
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -854,9 +855,10 @@ private[sql] object HadoopFsRelation extends Logging {
854855
status.getAccessTime)
855856
}.collect()
856857

857-
fakeStatuses.map { f =>
858+
val hadoopFakeStatuses = fakeStatuses.map { f =>
858859
new FileStatus(
859860
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
860-
}.toSet
861+
}
862+
mutable.LinkedHashSet(hadoopFakeStatuses: _*)
861863
}
862864
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.io.Files
2323
import org.apache.hadoop.fs.Path
2424

2525
import org.apache.spark.deploy.SparkHadoopUtil
26-
import org.apache.spark.sql.{execution, AnalysisException, SaveMode}
26+
import org.apache.spark.sql._
2727
import org.apache.spark.sql.types._
2828

2929

@@ -155,4 +155,23 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
155155
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
156156
}
157157
}
158+
159+
test("SPARK-11500: Not deterministic order of columns when using merging schemas.") {
160+
import testImplicits._
161+
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
162+
withTempPath { dir =>
163+
val pathOne = s"${dir.getCanonicalPath}/part=1"
164+
Seq(1, 1).zipWithIndex.toDF("a", "b").write.parquet(pathOne)
165+
val pathTwo = s"${dir.getCanonicalPath}/part=2"
166+
Seq(1, 1).zipWithIndex.toDF("c", "b").write.parquet(pathTwo)
167+
val pathThree = s"${dir.getCanonicalPath}/part=3"
168+
Seq(1, 1).zipWithIndex.toDF("d", "b").write.parquet(pathThree)
169+
170+
// The schema consists of the leading columns of the first part-file
171+
// in the lexicographic order.
172+
assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name)
173+
=== Seq("a", "b", "c", "d", "part"))
174+
}
175+
}
176+
}
158177
}

0 commit comments

Comments
 (0)