Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ private[kafka010] object KafkaWriter extends Logging {
topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing this, can KafkaSink still track its streaming writing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, because it's used in KafkaProvider which is run with SaveIntoDataSourceCommand, and all commands will be wrapped by SQLExecution.withNewExecutionId

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean KafkaSourceProvider, is it the same code path as KafkaSink? In KafkaSink.addBath, KafkaWriter.write is also called to write into Kafka.

Copy link
Contributor Author

@cloud-fan cloud-fan May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaSink.addBath is also wrapped with SQLExecution.withNewExecutionId in StreamExecution: https://github.com/apache/spark/pull/18064/files#diff-6532dd3b63bdab0364fbcf2303e290e4R658

queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
})
}

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
override def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
trait Command extends LeafNode {
trait Command extends LogicalPlan {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.math.{MathContext, RoundingMode}
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -243,9 +244,9 @@ object ColumnStat extends Logging {
}

col.dataType match {
case _: IntegralType => fixedLenTypeStruct(LongType)
case dt: IntegralType => fixedLenTypeStruct(dt)
case _: DecimalType => fixedLenTypeStruct(col.dataType)
case DoubleType | FloatType => fixedLenTypeStruct(DoubleType)
case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
case BooleanType => fixedLenTypeStruct(col.dataType)
case DateType => fixedLenTypeStruct(col.dataType)
case TimestampType => fixedLenTypeStruct(col.dataType)
Expand All @@ -264,14 +265,12 @@ object ColumnStat extends Logging {
}

/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
// for string/binary min/max, get should return null
min = Option(row.get(1))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
max = Option(row.get(2))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
min = Option(row.get(1, attr.dataType)),
max = Option(row.get(2, attr.dataType)),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -231,12 +232,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

runCommand(df.sparkSession, "save") {
SaveIntoDataSourceCommand(
query = df.logicalPlan,
provider = source,
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap,
mode = mode)
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
}
}

Expand Down Expand Up @@ -607,7 +607,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
qe.toRdd
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
Expand Down
48 changes: 40 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ class Dataset[T] private[sql](
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
Expand Down Expand Up @@ -248,8 +248,13 @@ class Dataset[T] private[sql](
_numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = {
val numRows = _numRows.max(0)
val takeResult = toDF().take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
showString(takeResult, numRows, truncate, vertical)
}

private def showString(
dataWithOneMoreRow: Array[Row], numRows: Int, truncate: Int, vertical: Boolean): String = {
val hasMoreData = dataWithOneMoreRow.length > numRows
val data = dataWithOneMoreRow.take(numRows)

lazy val timeZone =
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
Expand Down Expand Up @@ -684,6 +689,18 @@ class Dataset[T] private[sql](
} else {
println(showString(numRows, truncate = 0))
}

// An internal version of `show`, which won't set execution id and trigger listeners.
private[sql] def showInternal(_numRows: Int, truncate: Boolean): Unit = {
val numRows = _numRows.max(0)
val takeResult = toDF().takeInternal(numRows + 1)

if (truncate) {
println(showString(takeResult, numRows, truncate = 20, vertical = false))
} else {
println(showString(takeResult, numRows, truncate = 0, vertical = false))
}
}
// scalastyle:on println

/**
Expand Down Expand Up @@ -2453,6 +2470,11 @@ class Dataset[T] private[sql](
*/
def take(n: Int): Array[T] = head(n)

// An internal version of `take`, which won't set execution id and trigger listeners.
private[sql] def takeInternal(n: Int): Array[T] = {
collectFromPlan(limit(n).queryExecution.executedPlan)
}

/**
* Returns the first `n` rows in the Dataset as a list.
*
Expand All @@ -2477,6 +2499,11 @@ class Dataset[T] private[sql](
*/
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

// An internal version of `collect`, which won't set execution id and trigger listeners.
private[sql] def collectInternal(): Array[T] = {
collectFromPlan(queryExecution.executedPlan)
}

/**
* Returns a Java list that contains all rows in this Dataset.
*
Expand Down Expand Up @@ -2518,6 +2545,11 @@ class Dataset[T] private[sql](
plan.executeCollect().head.getLong(0)
}

// An internal version of `count`, which won't set execution id and trigger listeners.
private[sql] def countInternal(): Long = {
groupBy().count().queryExecution.executedPlan.executeCollect().head.getLong(0)
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
*
Expand Down Expand Up @@ -2763,7 +2795,7 @@ class Dataset[T] private[sql](
createTempViewCommand(viewName, replace = true, global = true)
}

private def createTempViewCommand(
private[spark] def createTempViewCommand(
viewName: String,
replace: Boolean,
global: Boolean): CreateViewCommand = {
Expand Down Expand Up @@ -2954,17 +2986,17 @@ class Dataset[T] private[sql](
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {


/**
* Returns the result as a hive compatible sequence of strings. This is for testing only.
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(): Seq[String] = executedPlan match {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sparkSession).map {
Expand All @@ -127,7 +128,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ object SQLExecution {
executionIdToQueryExecution.get(executionId)
}

private val testing = sys.props.contains("spark.testing")

private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
// only throw an exception during tests. a missing execution ID should not fail a job.
if (testing && sparkSession.sparkContext.getLocalProperty(EXECUTION_ID_KEY) == null) {
// Attention testers: when a test fails with this exception, it means that the action that
// started execution of a query didn't call withNewExecutionId. The execution ID should be
// set by calling withNewExecutionId in the action that begins execution, like
// Dataset.collect or DataFrameWriter.insertInto.
throw new IllegalStateException("Execution ID should be set")
}
}

/**
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
* we can connect them with an execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class InMemoryRelation(
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
override def innerChildren: Seq[SparkPlan] = Seq(child)

override def producedAttributes: AttributeSet = outputSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class InMemoryTableScanExec(
@transient relation: InMemoryRelation)
extends LeafExecNode {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution


/**
Expand Down Expand Up @@ -96,11 +97,13 @@ case class AnalyzeColumnCommand(
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))

val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation))
.executedPlan.executeTake(1).head

val rowCount = statsRow.getLong(0)
val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1), attr))
// according to `ColumnStat.statExprs`, the stats struct always have 6 fields.
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), attr))
}.toMap
(rowCount, columnStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class AnalyzeTableCommand(
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = sparkSession.table(tableIdentWithDB).count()
val newRowCount = sparkSession.table(tableIdentWithDB).countInternal()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ case class CacheTableCommand(
require(plan.isEmpty || tableIdent.database.isEmpty,
"Database name is not allowed in CACHE TABLE AS SELECT")

override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq
}
override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq

override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
Dataset.ofRows(sparkSession, logicalPlan)
.createTempViewCommand(tableIdent.quotedString, replace = false, global = false)
.run(sparkSession)
}
sparkSession.catalog.cacheTable(tableIdent.quotedString)

if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
sparkSession.table(tableIdent).countInternal()
}

Seq.empty[Row]
Expand Down
Loading