Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def selectExpr(self, *expr):
`select` that accepts SQL expressions.

>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row(('age * 2)=4, Abs('age)=2), Row(('age * 2)=10, Abs('age)=5)]
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
"""
jexpr = ListConverter().convert(expr, self._sc._gateway._gateway_client)
jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down Expand Up @@ -66,6 +67,17 @@ abstract class Expression extends TreeNode[Expression] {
*/
def childrenResolved = !children.exists(!_.resolved)

/**
* Returns a string representation of this expression that does not have developer centric
* debugging information like the expression id.
*/
def prettyString: String = {
transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use transform here? this will change the expression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only replace some leaf nodes in the expression tree, does it cause problems?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, i am going through the code, and i think we'd better not use transform here. in which case we need change the expression?

case a: AttributeReference => PrettyAttribute(a.name)
case u: UnresolvedAttribute => PrettyAttribute(u.name)
}.toString
}

/**
* A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type
* and do any casting necessary of child evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ case class AttributeReference(
override def toString: String = s"$name#${exprId.id}$typeSuffix"
}

/**
* A place holder used when printing expressions without debugging information such as the
* expression id or the unresolved indicator.
*/
case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
type EvaluatedType = Any

override def toString = name

override def withNullability(newNullability: Boolean): Attribute = ???
override def newInstance(): Attribute = ???
override def withQualifiers(newQualifiers: Seq[String]): Attribute = ???
override def withName(newName: String): Attribute = ???
override def qualifiers: Seq[String] = ???
override def exprId: ExprId = ???
override def eval(input: Row): EvaluatedType = ???
override def nullable: Boolean = ???
override def dataType: DataType = ???
}

object VirtualColumn {
val groupingIdName = "grouping__id"
def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)()
Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

import scala.util.control.NonFatal


private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
Expand Down Expand Up @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] {
*/
def toDataFrame: DataFrame = this

override def toString =
try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch {
case NonFatal(e) =>
s"Invalid tree; ${e.getMessage}:\n$queryExecution"
}

/**
* Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
* from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
Expand Down
10 changes: 4 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql](
override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)

override def select(cols: Column*): DataFrame = {
val exprs = cols.zipWithIndex.map {
case (Column(expr: NamedExpression), _) =>
expr
case (Column(expr: Expression), _) =>
Alias(expr, expr.toString)()
val namedExpressions = cols.map {
case Column(expr: NamedExpression) => expr
case Column(expr: Expression) => Alias(expr, expr.prettyString)()
}
Project(exprs.toSeq, logicalPlan)
Project(namedExpressions.toSeq, logicalPlan)
}

override def select(col: String, cols: String*): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn")
}

override def toString = expr.prettyString

override def isComputable: Boolean = false

override val sqlContext: SQLContext = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.types._

Expand All @@ -37,6 +37,15 @@ import org.apache.spark.sql.types._
*/
package object debug {

/**
* Augments [[SQLContext]] with debug methods.
*/
implicit class DebugSQLContext(sqlContext: SQLContext) {
def debug() = {
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
}
}

/**
* :: DeveloperApi ::
* Augments [[DataFrame]]s with debug methods.
Expand Down
29 changes: 29 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.TestData._

import scala.language.postfixOps

import org.apache.spark.sql.Dsl._
Expand Down Expand Up @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest {
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}

test("dataframe toString") {
assert(testData.toString === "[key: int, value: string]")
assert(testData("key").toString === "[key: int]")
}

test("incomputable toString") {
assert($"test".toString === "test")
}

test("invalid plan toString, debug mode") {
val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")

// Turn on debug mode so we can see invalid query plans.
import org.apache.spark.sql.execution.debug._
TestSQLContext.debug()

val badPlan = testData.select('badColumn)

assert(badPlan.toString contains badPlan.queryExecution.toString,
"toString on bad query plans should include the query execution but was:\n" +
badPlan.toString)

// Set the flag back to original value before this test.
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}

test("table scan") {
checkAnswer(
testData,
Expand Down