From 751495dc7a9105d0504376e28ea73e504f5f7bb1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 00:06:31 -0700 Subject: [PATCH 1/6] reduce location length --- .../scala/org/apache/spark/util/Utils.scala | 17 ++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 8 +++++ .../sql/execution/DataSourceScanExec.scala | 7 ++-- .../execution/datasources/v2/FileScan.scala | 6 ++-- .../DataSourceScanExecRedactionSuite.scala | 33 +++++++++++++++++++ 5 files changed, 67 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c7db2127a6f04..a102ea55a0b36 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2904,6 +2904,23 @@ private[spark] object Utils extends Logging { props.forEach((k, v) => resultProps.put(k, v)) resultProps } + + /** + * Convert a sequence of [[Path]] to a metadata string. When the length of metadata string + * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. + */ + def pathsToMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { + var metadata = "[" + var index: Int = 0 + while (index < paths.length && metadata.length <= stopAppendingThreshold) { + if (index > 0) { + metadata += ", " + } + metadata += paths(index).toString + index += 1 + } + metadata + "]" + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 931eb6b5413f7..d1f94e6d210aa 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1301,6 +1301,14 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.trimExceptCRLF(s"b${s}b") === s"b${s}b") } } + + test("pathsToMetadata") { + val paths = (0 to 4).map(i => new Path(s"path$i")) + assert(Utils.pathsToMetadata(paths, 1) == "[path0]") + assert(Utils.pathsToMetadata(paths, 10) == "[path0, path1]") + assert(Utils.pathsToMetadata(paths, 15) == "[path0, path1, path2]") + assert(Utils.pathsToMetadata(paths, 20) == "[path0, path1, path2, path3]") + } } private class SimpleExtension diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 66996498ffd3b..83aa2ee49945e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -55,10 +55,12 @@ trait DataSourceScanExec extends LeafExecNode { // Metadata that describes more details of this scan. protected def metadata: Map[String, String] + protected val maxMetadataValueLength = 100 + override def simpleString(maxFields: Int): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + StringUtils.abbreviate(redact(value), 100) + key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength) } val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) redact( @@ -335,7 +337,8 @@ case class FileSourceScanExec( def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location val locationDesc = - location.getClass.getSimpleName + seqToString(location.rootPaths) + location.getClass.getSimpleName + + Utils.pathsToMetadata(location.rootPaths, maxMetadataValueLength) val metadata = Map( "Format" -> relation.fileFormat.toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 6e05aa56f4f72..1f4f5e7b7f455 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -94,8 +94,10 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin override def hashCode(): Int = getClass.hashCode() override def description(): String = { + val maxMetadataValueLength = 100 val locationDesc = - fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") + fileIndex.getClass.getSimpleName + + Utils.pathsToMetadata(fileIndex.rootPaths, maxMetadataValueLength) val metadata: Map[String, String] = Map( "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), @@ -105,7 +107,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin case (key, value) => val redactedValue = Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) - key + ": " + StringUtils.abbreviate(redactedValue, 100) + key + ": " + StringUtils.abbreviate(redactedValue, maxMetadataValueLength) }.mkString(", ") s"${this.getClass.getSimpleName} $metadataStr" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index f1411b263c77b..d14f7148e5efa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -116,6 +116,39 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { assert(isIncluded(df.queryExecution, "Location")) } } + + test("FileSourceScanExec metadata should contain limited file paths") { + withTempPath { path => + val dir = path.getCanonicalPath + val partitionCol = "partitionCol" + spark.range(10) + .select("id", "id") + .toDF("value", partitionCol) + .write + .partitionBy(partitionCol) + .orc(dir) + val paths = (0 to 9).map(i => dir + "/" + partitionCol + "=" + i) + val plan = spark + .read + .orc(paths: _*) + .queryExecution + .executedPlan + val location = plan collectFirst { + case f: FileSourceScanExec => f.metadata("Location") + } + assert(location.isDefined) + var found = false + for (index <- 1 to 10) { + val tempLocation = paths.slice(0, index).mkString("[", ", ", "]") + if (tempLocation.length >= 100 && !found) { + found = true + for (tempIndex <- 0 until index) { + assert(location.get.contains(paths(tempIndex))) + } + } + } + } + } } /** From 74dd5c72057a380b2edcd0bd78f17b2d0de7fbfb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 00:20:57 -0700 Subject: [PATCH 2/6] revise naming --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../src/test/scala/org/apache/spark/util/UtilsSuite.scala | 8 ++++---- .../apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- .../spark/sql/execution/datasources/v2/FileScan.scala | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a102ea55a0b36..88e9154174401 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2909,7 +2909,7 @@ private[spark] object Utils extends Logging { * Convert a sequence of [[Path]] to a metadata string. When the length of metadata string * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. */ - def pathsToMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { + def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { var metadata = "[" var index: Int = 0 while (index < paths.length && metadata.length <= stopAppendingThreshold) { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index d1f94e6d210aa..3fc0eaa527624 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1304,10 +1304,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("pathsToMetadata") { val paths = (0 to 4).map(i => new Path(s"path$i")) - assert(Utils.pathsToMetadata(paths, 1) == "[path0]") - assert(Utils.pathsToMetadata(paths, 10) == "[path0, path1]") - assert(Utils.pathsToMetadata(paths, 15) == "[path0, path1, path2]") - assert(Utils.pathsToMetadata(paths, 20) == "[path0, path1, path2, path3]") + assert(Utils.buildLocationMetadata(paths, 1) == "[path0]") + assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]") + assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]") + assert(Utils.buildLocationMetadata(paths, 20) == "[path0, path1, path2, path3]") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 83aa2ee49945e..0ae39cf8560e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -338,7 +338,7 @@ case class FileSourceScanExec( val location = relation.location val locationDesc = location.getClass.getSimpleName + - Utils.pathsToMetadata(location.rootPaths, maxMetadataValueLength) + Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength) val metadata = Map( "Format" -> relation.fileFormat.toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 1f4f5e7b7f455..7e8e0ed2dc675 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -97,7 +97,7 @@ trait FileScan extends Scan with Batch with SupportsReportStatistics with Loggin val maxMetadataValueLength = 100 val locationDesc = fileIndex.getClass.getSimpleName + - Utils.pathsToMetadata(fileIndex.rootPaths, maxMetadataValueLength) + Utils.buildLocationMetadata(fileIndex.rootPaths, maxMetadataValueLength) val metadata: Map[String, String] = Map( "ReadSchema" -> readDataSchema.catalogString, "PartitionFilters" -> seqToString(partitionFilters), From 757626fbf90c04dc2ae1d1075ed759eb56f35fb7 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 01:01:36 -0700 Subject: [PATCH 3/6] address comments --- .../scala/org/apache/spark/util/Utils.scala | 9 ++++---- .../DataSourceScanExecRedactionSuite.scala | 23 ++++++------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 88e9154174401..dd7fec7926fde 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2910,16 +2910,17 @@ private[spark] object Utils extends Logging { * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. */ def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { - var metadata = "[" + val metadata = new StringBuilder("[") var index: Int = 0 while (index < paths.length && metadata.length <= stopAppendingThreshold) { if (index > 0) { - metadata += ", " + metadata.append(", ") } - metadata += paths(index).toString + metadata.append(paths(index).toString) index += 1 } - metadata + "]" + metadata.append("]") + metadata.toString } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index d14f7148e5efa..824cc6d4efc0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -127,26 +127,17 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { .write .partitionBy(partitionCol) .orc(dir) - val paths = (0 to 9).map(i => dir + "/" + partitionCol + "=" + i) - val plan = spark - .read - .orc(paths: _*) - .queryExecution - .executedPlan + val paths = (0 to 9).map(i => s"$dir/$partitionCol=$i") + val plan = spark.read.orc(paths: _*).queryExecution.executedPlan val location = plan collectFirst { case f: FileSourceScanExec => f.metadata("Location") } assert(location.isDefined) - var found = false - for (index <- 1 to 10) { - val tempLocation = paths.slice(0, index).mkString("[", ", ", "]") - if (tempLocation.length >= 100 && !found) { - found = true - for (tempIndex <- 0 until index) { - assert(location.get.contains(paths(tempIndex))) - } - } - } + // The location metadata should at least contain one path + assert(location.get.contains(paths.head)) + // If the temp path length is larger than 100, the metadata length should not exceed + // twice of the length; otherwise, the metadata length should be controlled within 200. + assert(location.get.length < Math.max(paths.head.length, 100) * 2) } } } From c64bf20a4c904f79c1ad9a919313aaee5ee0082f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 01:11:14 -0700 Subject: [PATCH 4/6] address comments --- .../sql/execution/DataSourceScanExecRedactionSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index 824cc6d4efc0d..31c0745142fc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import java.io.File + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -127,7 +129,7 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { .write .partitionBy(partitionCol) .orc(dir) - val paths = (0 to 9).map(i => s"$dir/$partitionCol=$i") + val paths = (0 to 9).map(i => new File(dir, s"$partitionCol=$i").getCanonicalPath) val plan = spark.read.orc(paths: _*).queryExecution.executedPlan val location = plan collectFirst { case f: FileSourceScanExec => f.metadata("Location") From 0a089ad7bd89a379264c612c9e5e7071d7c4a707 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 12:01:52 -0700 Subject: [PATCH 5/6] address more comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/sql/execution/DataSourceScanExecRedactionSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index dd7fec7926fde..4905d7ec0132f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2906,7 +2906,7 @@ private[spark] object Utils extends Logging { } /** - * Convert a sequence of [[Path]] to a metadata string. When the length of metadata string + * Convert a sequence of `Path`s to a metadata string. When the length of metadata string * exceeds `stopAppendingThreshold`, stop appending paths for saving memory. */ def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index 31c0745142fc4..c99be986ddca5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -119,7 +119,7 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { } } - test("FileSourceScanExec metadata should contain limited file paths") { + test("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") { withTempPath { path => val dir = path.getCanonicalPath val partitionCol = "partitionCol" From 9d736efdc4f0ef456ae90eac7eb032c9e09ae8ed Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 22 May 2020 12:27:03 -0700 Subject: [PATCH 6/6] update --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4905d7ec0132f..9636fe88c77c2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2912,7 +2912,7 @@ private[spark] object Utils extends Logging { def buildLocationMetadata(paths: Seq[Path], stopAppendingThreshold: Int): String = { val metadata = new StringBuilder("[") var index: Int = 0 - while (index < paths.length && metadata.length <= stopAppendingThreshold) { + while (index < paths.length && metadata.length < stopAppendingThreshold) { if (index > 0) { metadata.append(", ") } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 3fc0eaa527624..c9c8ae6023877 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1304,10 +1304,10 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("pathsToMetadata") { val paths = (0 to 4).map(i => new Path(s"path$i")) - assert(Utils.buildLocationMetadata(paths, 1) == "[path0]") + assert(Utils.buildLocationMetadata(paths, 5) == "[path0]") assert(Utils.buildLocationMetadata(paths, 10) == "[path0, path1]") assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]") - assert(Utils.buildLocationMetadata(paths, 20) == "[path0, path1, path2, path3]") + assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]") } }