Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
19b9a68
Stub implementation and a test
MaxGekk Sep 15, 2018
90832f9
Saving all plans to file
MaxGekk Sep 15, 2018
673ae56
Output attributes
MaxGekk Sep 15, 2018
fbde812
Output whole stage codegen
MaxGekk Sep 15, 2018
dca19d3
Reusing codegenToOutputStream
MaxGekk Sep 15, 2018
66351a0
Code de-duplication
MaxGekk Sep 15, 2018
2ee75bc
Do not truncate fields
MaxGekk Sep 15, 2018
9b2a3e6
Moving the test up because previous one leaved a garbage
MaxGekk Sep 15, 2018
51c196e
Removing string interpolation in the test
MaxGekk Sep 16, 2018
c66a616
Getting Hadoop's conf from session state
MaxGekk Sep 16, 2018
ed57c8e
Using java.io.Writer
MaxGekk Sep 16, 2018
ce2c086
Using java.io.Writer
MaxGekk Sep 16, 2018
37326e2
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 17, 2018
7abf14c
Using StringWriter
MaxGekk Sep 17, 2018
d1188e3
Removing unneeded buffering and flushing
MaxGekk Sep 17, 2018
71ff7d1
Code de-duplication among toString and toFile
MaxGekk Sep 17, 2018
ac94a86
Using StringBuilderWriter and fix tests
MaxGekk Sep 18, 2018
f2906d9
Do not change maxFields so far
MaxGekk Sep 18, 2018
d3fede1
Added tests
MaxGekk Sep 18, 2018
c153838
Using StringBuilderWriter in treeString
MaxGekk Sep 18, 2018
6fe08bf
Propagating numFields to truncatedString
MaxGekk Sep 18, 2018
3324927
Bug fix + test
MaxGekk Sep 18, 2018
d63f862
Bug fix: passing maxFields to simpleString
MaxGekk Sep 18, 2018
24dbbba
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 18, 2018
deb5315
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 24, 2018
7fd88d3
Passing parameters by names
MaxGekk Sep 24, 2018
732707a
Getting file system from file path
MaxGekk Sep 24, 2018
3a133ae
Using the buffered writer
MaxGekk Sep 24, 2018
7452b82
Removing default value for maxFields in simpleString
MaxGekk Sep 24, 2018
4ec5732
Removing unnecessary signature of truncatedString
MaxGekk Sep 24, 2018
be16175
Minor improvement - passing maxFields by name
MaxGekk Sep 25, 2018
90ff7b5
Moving truncatedString out of core
MaxGekk Sep 25, 2018
2ba6624
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 26, 2018
1fcfc23
Adding SQL config to control maximum number of fields
MaxGekk Sep 27, 2018
2bf11fc
Adding Spark Core config to control maximum number of fields
MaxGekk Sep 27, 2018
5e2d3a6
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 27, 2018
bd331c5
Revert indentations
MaxGekk Sep 27, 2018
3cf564b
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Oct 11, 2018
2375064
Making writeOrError multi-line
MaxGekk Oct 11, 2018
8befa13
Removing core config: spark.debug.maxToStringFields
MaxGekk Oct 11, 2018
28795c7
Improving description of spark.sql.debug.maxToStringFields
MaxGekk Oct 11, 2018
a246db4
Limit number of fields in structs too
MaxGekk Oct 11, 2018
d4da29b
Description of simpleString of TreeNode.
MaxGekk Oct 11, 2018
41b57bc
Added description of maxFields param of truncatedString
MaxGekk Oct 11, 2018
28cce2e
Fix typo
MaxGekk Oct 11, 2018
e4567cb
Passing maxField
MaxGekk Oct 11, 2018
9b72104
Fix for the warning
MaxGekk Oct 11, 2018
9f1d11d
Merge branch 'master' into plan-to-file
MaxGekk Oct 12, 2018
76f4248
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Oct 31, 2018
f7de26d
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Nov 5, 2018
bda6ac2
Merge branch 'master' into plan-to-file
MaxGekk Nov 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,53 +93,6 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/**
* The performance overhead of creating and logging strings for wide schemas can be large. To
* limit the impact, we bound the number of fields to include by default. This can be overridden
* by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
*/
val DEFAULT_MAX_TO_STRING_FIELDS = 25

private[spark] def maxNumToStringFields = {
if (SparkEnv.get != null) {
SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
} else {
DEFAULT_MAX_TO_STRING_FIELDS
}
}

/** Whether we have warned about plan string truncation yet. */
private val truncationWarningPrinted = new AtomicBoolean(false)

/**
* Format a sequence with semantics similar to calling .mkString(). Any elements beyond
* maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
*
* @return the trimmed and formatted string.
*/
def truncatedString[T](
seq: Seq[T],
start: String,
sep: String,
end: String,
maxNumFields: Int = maxNumToStringFields): String = {
if (seq.length > maxNumFields) {
if (truncationWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was too large. This " +
"behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.")
}
val numFields = math.max(0, maxNumFields - 1)
seq.take(numFields).mkString(
start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
} else {
seq.mkString(start, sep, end)
}
}

/** Shorthand for calling truncatedString() without start or end strings. */
def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down
8 changes: 0 additions & 8 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ import org.apache.spark.scheduler.SparkListener

class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("truncatedString") {
assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
}

test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ class Analyzer(
case plan if containsDeserializer(plan.expressions) => plan

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
logTrace(s"Attempting to resolve ${q.simpleString(maxFields = None)}")
q.mapExpressions(resolve(_, q))
}

Expand Down Expand Up @@ -1733,7 +1733,7 @@ class Analyzer(

case p if p.expressions.exists(hasGenerator) =>
throw new AnalysisException("Generators are not supported outside the SELECT clause, but " +
"got: " + p.simpleString)
"got: " + p.simpleString(maxFields = None))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ trait CheckAnalysis extends PredicateHelper {
val missingAttributes = o.missingInput.mkString(",")
val input = o.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
s"from $input in operator ${operator.simpleString}."
s"from $input in operator ${operator.simpleString(maxFields = None)}."

val resolver = plan.conf.resolver
val attrsWithSameName = o.missingInput.filter { missing =>
Expand Down Expand Up @@ -373,7 +373,7 @@ trait CheckAnalysis extends PredicateHelper {
s"""nondeterministic expressions are only allowed in
|Project, Filter, Aggregate or Window, found:
| ${o.expressions.map(_.sql).mkString(",")}
|in operator ${operator.simpleString}
|in operator ${operator.simpleString(maxFields = None)}
""".stripMargin)

case _: UnresolvedHint =>
Expand All @@ -385,7 +385,8 @@ trait CheckAnalysis extends PredicateHelper {
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case o if !o.resolved =>
failAnalysis(s"unresolved operator ${o.simpleString(maxFields = None)}")
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {
case Some(newType) if a.dataType == newType.dataType => a
case Some(newType) =>
logDebug(
s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}")
s"Promoting $a from ${a.dataType} to ${newType.dataType} in " +
q.simpleString(maxFields = None))
newType
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ case class ExpressionEncoder[T](
extractProjection(inputRow)
} catch {
case e: Exception =>
val encoded = serializer.map(_.simpleString(maxFields = None)).mkString("\n")
throw new RuntimeException(
s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e)
s"Error while encoding: $e\n${encoded}", e)
}

/**
Expand All @@ -331,7 +332,8 @@ case class ExpressionEncoder[T](
constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
} catch {
case e: Exception =>
throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e)
val decoded = deserializer.simpleString(maxFields = None)
throw new RuntimeException(s"Error while decoding: $e\n${decoded}", e)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the basic expression abstract classes in Catalyst.
Expand Down Expand Up @@ -233,12 +233,12 @@ abstract class Expression extends TreeNode[Expression] {

// Marks this as final, Expression.verboseString should never be called, and thus shouldn't be
// overridden by concrete classes.
final override def verboseString: String = simpleString
final override def verboseString(maxFields: Option[Int]): String = simpleString(maxFields)

override def simpleString: String = toString
override def simpleString(maxFields: Option[Int]): String = toString

override def toString: String = prettyName + Utils.truncatedString(
flatArguments.toSeq, "(", ", ", ")")
override def toString: String = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")", maxFields = None)

/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode {
case _ => code"$this\n$other"
}

override def verboseString: String = toString
override def verboseString(maxFields: Option[Int]): String = toString
}

object Block {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class UserDefinedGenerator(
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map { e =>
StructField(e.simpleString, e.dataType, nullable = true)
StructField(e.simpleString(maxFields = None), e.dataType, nullable = true)
})
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[InternalRow => Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ case class NamedLambdaVariable(

override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"

override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
override def simpleString(maxFields: Option[Int]): String = {
s"lambda $name#${exprId.id}: ${dataType.simpleString}"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
input
}

private val outputPrefix = s"Result of ${child.simpleString} is "
private val outputPrefix = s"Result of ${child.simpleString(maxFields = None)} is "

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix)
Expand Down Expand Up @@ -72,7 +72,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa

override def prettyName: String = "assert_true"

private val errMsg = s"'${child.simpleString}' is not true!"
private val errMsg = s"'${child.simpleString(maxFields = None)}' is not true!"

override def eval(input: InternalRow) : Any = {
val v = child.eval(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ case class AttributeReference(

// Since the expression id is not in the first constructor it is missing from the default
// tree string.
override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}"
override def simpleString(maxFields: Option[Int]): String = {
s"$name#${exprId.id}: ${dataType.simpleString}"
}

override def sql: String = {
val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""

override def simpleString: String = statePrefix + super.simpleString
override def simpleString(maxFields: Option[Int]): String = {
statePrefix + super.simpleString(maxFields)
}

override def verboseString: String = simpleString
override def verboseString(maxFields: Option[Int]): String = simpleString(maxFields)

/**
* All the subqueries of current plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract class LogicalPlan
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)

override def verboseStringWithSuffix: String = {
super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
override def verboseStringWithSuffix(maxFields: Option[Int]): String = {
super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler

/**
Expand Down Expand Up @@ -468,7 +468,7 @@ case class View(

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def simpleString: String = {
override def simpleString(maxFields: Option[Int]): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}
}
Expand All @@ -484,8 +484,8 @@ case class View(
case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def simpleString: String = {
val cteAliases = Utils.truncatedString(cteRelations.map(_._1), "[", ", ", "]")
override def simpleString(maxFields: Option[Int]): String = {
val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields)
s"CTE $cteAliases"
}

Expand Down Expand Up @@ -557,7 +557,7 @@ case class Range(

override def newInstance(): Range = copy(output = output.map(_.newInstance()))

override def simpleString: String = {
override def simpleString(maxFields: Option[Int]): String = {
s"Range ($start, $end, step=$step, splits=$numSlices)"
}

Expand Down
Loading