Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
22bd4bd
Merge pull request #1 from apache/master
DaveDeCaprio Nov 27, 2018
b7f964d
Added a configurable limit on the maximum length of a plan debug string.
DaveDeCaprio Nov 28, 2018
3f4fe1f
Removed unneeded imports
DaveDeCaprio Nov 28, 2018
30e4348
Moved withSizeLimitedWriter to treeString function that uses StringWr…
DaveDeCaprio Nov 28, 2018
5528ca1
Fixed formatting
DaveDeCaprio Nov 28, 2018
3171cf3
Coverted to javadoc style multiline comment
DaveDeCaprio Nov 28, 2018
3ffdc6a
Fixed scalastyle formatting of imports
DaveDeCaprio Nov 28, 2018
45a60fc
Have size limit cut off right at the correct number of characters.
DaveDeCaprio Nov 29, 2018
9678799
Changed name to remove the debug from the config parameter name
DaveDeCaprio Nov 29, 2018
a5af842
Fixed formatting error
DaveDeCaprio Nov 29, 2018
a4be985
Changed length default to Long.MaxValue to turn off behavior unless i…
DaveDeCaprio Dec 3, 2018
f0f75c2
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Dec 3, 2018
1b692a0
Added test case for not limiting plan length and tested with a defaul…
DaveDeCaprio Dec 4, 2018
22fe117
Correctly added test case missed in the previous commit
DaveDeCaprio Dec 4, 2018
be3f265
Added more documentation of the plan length parameter.
DaveDeCaprio Dec 8, 2018
f6d0efc
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Dec 8, 2018
855f540
Removed tab
DaveDeCaprio Dec 8, 2018
e83f5f2
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Jan 5, 2019
db663b7
Merge branch 'master' of https://github.com/apache/spark into text-pl…
DaveDeCaprio Jan 20, 2019
2eecbfa
Added plan size limits to StringConcat
DaveDeCaprio Jan 21, 2019
4082aa3
Scalastyle
DaveDeCaprio Jan 21, 2019
f9085e7
Incorporated changes from code review
DaveDeCaprio Jan 21, 2019
b3d43b7
Missed one error
DaveDeCaprio Jan 21, 2019
e470ab2
Cleaned up append function and added tracking of the full plan length.
DaveDeCaprio Jan 21, 2019
35bc1d5
Got rid of unneeded "availableLength" flag.
DaveDeCaprio Jan 21, 2019
5ec58c8
Fixed errors missed
DaveDeCaprio Jan 21, 2019
bdfaf28
Updated to handle nulls again.
DaveDeCaprio Jan 22, 2019
0cfcb4e
Tabs to spaces
DaveDeCaprio Jan 22, 2019
eb69888
Remove useless test.
DaveDeCaprio Jan 22, 2019
daf02f2
Addressed code review comments.
DaveDeCaprio Mar 8, 2019
7d89388
Missed some Boolean->Unit updates
DaveDeCaprio Mar 8, 2019
4f56e48
Missed some Boolean->Unit updates
DaveDeCaprio Mar 8, 2019
a090fbb
Code formatting
DaveDeCaprio Mar 8, 2019
dcb4eb0
Fixed Fatal Warning.
DaveDeCaprio Mar 8, 2019
b4cb7bf
Fixed failing unit tests
DaveDeCaprio Mar 9, 2019
4fec590
Style/formatting issues from code review
DaveDeCaprio Mar 11, 2019
db0db18
Style/formatting issues from code review
DaveDeCaprio Mar 11, 2019
b5b30f3
Changed plan string length to bytesConf and gave a better message for…
DaveDeCaprio Mar 13, 2019
e4afa26
Tabs to spaces
DaveDeCaprio Mar 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
val concat = new StringConcat()
val concat = new PlanStringConcat()

treeString(concat.append, verbose, addSuffix, maxFields)
concat.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.spark.sql.catalyst.util

import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.{Pattern, PatternSyntaxException}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String

object StringUtils {
object StringUtils extends Logging {

/**
* Validate and convert SQL 'like' pattern to a Java regular expression.
Expand Down Expand Up @@ -92,20 +96,29 @@ object StringUtils {

/**
* Concatenation of sequence of strings to final string with cheap append method
* and one memory allocation for the final string.
* and one memory allocation for the final string. Can also bound the final size of
* the string.
*/
class StringConcat {
private val strings = new ArrayBuffer[String]
private var length: Int = 0
class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
protected val strings = new ArrayBuffer[String]
protected var length: Int = 0

def atLimit: Boolean = length >= maxLength

/**
* Appends a string and accumulates its length to allocate a string buffer for all
* appended strings once in the toString method.
* appended strings once in the toString method. Returns true if the string still
* has room for further appends before it hits its max limit.
*/
def append(s: String): Unit = {
if (s != null) {
strings.append(s)
length += s.length
val sLen = s.length
if (!atLimit) {
val available = maxLength - length
val stringToAppend = if (available >= sLen) s else s.substring(0, available)
strings.append(stringToAppend)
}
length += sLen
}
}

Expand All @@ -114,9 +127,36 @@ object StringUtils {
* returns concatenated string.
*/
override def toString: String = {
val result = new java.lang.StringBuilder(length)
val finalLength = if (atLimit) maxLength else length
val result = new java.lang.StringBuilder(finalLength)
strings.foreach(result.append)
result.toString
}
}

/**
* A string concatenator for plan strings. Uses length from a configured value, and
* prints a warning the first time a plan is truncated.
*/
class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) {
override def toString: String = {
if (atLimit) {
logWarning(
"Truncated the string representation of a plan since it was too long. The " +
s"plan had length ${length} and the maximum is ${maxLength}. This behavior " +
"can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
val truncateMsg = if (maxLength == 0) {
s"Truncated plan of $length characters"
} else {
s"... ${length - maxLength} more characters"
}
val result = new java.lang.StringBuilder(maxLength + truncateMsg.length)
strings.foreach(result.append)
result.append(truncateMsg)
result.toString
} else {
super.toString
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,17 @@ object SQLConf {
.intConf
.createWithDefault(25)

val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
.doc("Maximum number of characters to output for a plan string. If the plan is " +
"longer, further output will be truncated. The default setting always generates a full " +
"plan. Set this to a lower value such as 8k if plan strings are taking up too much " +
"memory or are causing OutOfMemory errors in the driver or UI processes.")
.bytesConf(ByteUnit.BYTE)
.checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " +
"value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " +
"(nonnegative and shorter than the maximum size).")
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
.internal()
Expand Down Expand Up @@ -2056,6 +2067,8 @@ class SQLConf extends Serializable with Logging {

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt

def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions.DslString
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin}
import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -81,7 +82,7 @@ case class SelfReferenceUDF(
def apply(key: String): Boolean = config.contains(key)
}

class TreeNodeSuite extends SparkFunSuite {
class TreeNodeSuite extends SparkFunSuite with SQLHelper {
test("top node changed") {
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
assert(after === Literal(2))
Expand Down Expand Up @@ -595,4 +596,28 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Coalesce(Stream(Literal(1), Literal(3)))
assert(result === expected)
}

test("treeString limits plan length") {
withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") {
val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
Add(Literal(x), treeNode)
}

val planString = ds.treeString
logWarning("Plan string: " + planString)
assert(planString.endsWith(" more characters"))
assert(planString.length <= SQLConf.get.maxPlanStringLength)
}
}

test("treeString limit at zero") {
withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") {
val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) =>
Add(Literal(x), treeNode)
}

val planString = ds.treeString
assert(planString.startsWith("Truncated plan of"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite {

test("string concatenation") {
def concat(seq: String*): String = {
seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString
}

assert(new StringConcat().toString == "")
assert(concat("") == "")
assert(concat(null) == "")
assert(concat("a") == "a")
assert(concat("1", "2") == "12")
assert(concat("abc", "\n", "123") == "abc\n123")
assert(concat("") === "")
assert(concat(null) === "")
assert(concat("a") === "a")
assert(concat("1", "2") === "12")
assert(concat("abc", "\n", "123") === "abc\n123")
}

test("string concatenation with limit") {
def concat(seq: String*): String = {
seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString
}
assert(concat("under") === "under")
assert(concat("under", "over", "extra") === "underov")
assert(concat("underover") === "underov")
assert(concat("under", "ov") === "underov")
}

test("string concatenation return value") {
def checkLimit(s: String): Boolean = {
val sc = new StringConcat(7)
sc.append(s)
sc.atLimit
}
assert(!checkLimit("under"))
assert(checkLimit("1234567"))
assert(checkLimit("1234567890"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -105,7 +105,7 @@ class QueryExecution(
ReuseSubquery(sparkSession.sessionState.conf))

def simpleString: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
concat.append("\n")
Expand Down Expand Up @@ -133,13 +133,13 @@ class QueryExecution(
}

override def toString: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}

def stringWithStats: String = withRedaction {
val concat = new StringConcat()
val concat = new PlanStringConcat()
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
Expand Down Expand Up @@ -194,9 +194,11 @@ class QueryExecution(
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))

val append = (s: String) => {
writer.write(s)
}
try {
writePlans(writer.write, maxFields)
writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
} finally {
Expand Down