Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, InsertTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -114,7 +114,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {


/**
* Returns the result as a hive compatible sequence of strings. This is for testing only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hiveResultString is for testing only

Copy link
Contributor Author

@wzhfy wzhfy May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's called by SparkSQLDriver.run

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh... I did not realize it...

* Returns the result as a hive compatible sequence of strings.
*/
def hiveResultString(): Seq[String] = executedPlan match {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
Expand All @@ -130,12 +130,17 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case insertCommand @ ExecutedCommandExec(_: InsertTableCommand) =>
// Insert command will start a new execution through FileFormatWriter
insertCommand.executeCollect().map(_.toString)
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
SQLExecution.withNewExecutionId(sparkSession, this) {
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))
}
}

/** Formats a datum (based on the given data type) and returns the string representation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,3 +1025,5 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
builder.toString()
}
}

trait InsertTableCommand extends RunnableCommand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be too broad for this purpose. It could be misunderstood later. Should we add a comment for it?

Copy link
Contributor Author

@wzhfy wzhfy May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll add a comment for it

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex])
extends RunnableCommand {
extends InsertTableCommand {

import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.command.InsertTableCommand
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
Expand Down Expand Up @@ -78,7 +78,7 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean) extends RunnableCommand {
ifNotExists: Boolean) extends InsertTableCommand {

override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand Down