Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ class SqlParser extends AbstractSparkSQLParser {
}

protected lazy val insert: Parser[LogicalPlan] =
INSERT ~> OVERWRITE.? ~ (INTO ~> relation) ~ select ^^ {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o.isDefined)
INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
}

protected lazy val projection: Parser[Expression] =
Expand Down
49 changes: 47 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql

import java.util.{List => JList}

import scala.reflect.ClassTag

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand Down Expand Up @@ -485,6 +483,53 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
@Experimental
override def saveAsTable(tableName: String): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a given data source and
* a set of options. This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be great to add inline code examples.
we can do this later as separate prs.

*/
@Experimental
override def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a given data source and
* a set of options. This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*/
@Experimental
override def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit

@Experimental
override def save(path: String): Unit

@Experimental
override def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit

@Experimental
override def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
Expand Down
58 changes: 56 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
import org.apache.spark.sql.types.{NumericType, StructType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -303,8 +304,61 @@ private[sql] class DataFrameImpl protected[sql](
}

override def saveAsTable(tableName: String): Unit = {
sqlContext.executePlan(
CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd
val dataSourceName = sqlContext.conf.defaultDataSourceName
val cmd =
CreateTableUsingAsLogicalPlan(
tableName,
dataSourceName,
temporary = false,
Map.empty,
allowExisting = false,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = {
val cmd =
CreateTableUsingAsLogicalPlan(
tableName,
dataSourceName,
temporary = false,
(option +: options).toMap,
allowExisting = false,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit = {
val opts = options.toSeq
saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
}

override def save(path: String): Unit = {
val dataSourceName = sqlContext.conf.defaultDataSourceName
save(dataSourceName, ("path" -> path))
}

override def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = {
ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
}

override def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit = {
val opts = options.toSeq
save(dataSourceName, opts.head, opts.tail:_*)
}

override def insertInto(tableName: String, overwrite: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten

override def saveAsTable(tableName: String): Unit = err()

override def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = err()

override def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit = err()

override def save(path: String): Unit = err()

override def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = err()

override def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit = err()

override def insertInto(tableName: String, overwrite: Boolean): Unit = err()

override def toJSON: RDD[String] = err()
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ private[spark] object SQLConf {
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -155,6 +158,9 @@ private[sql] class SQLConf extends Serializable {
private[spark] def broadcastTimeout: Int =
getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt

private[spark] def defaultDataSourceName: String =
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
26 changes: 25 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -335,6 +336,29 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}

@Experimental
def load(path: String): DataFrame = {
val dataSourceName = conf.defaultDataSourceName
load(dataSourceName, ("path", path))
}

@Experimental
def load(
dataSourceName: String,
option: (String, String),
options: (String, String)*): DataFrame = {
val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap)
DataFrame(this, LogicalRelation(resolved.relation))
}

@Experimental
def load(
dataSourceName: String,
options: java.util.Map[String, String]): DataFrame = {
val opts = options.toSeq
load(dataSourceName, opts.head, opts.tail:_*)
}

/**
* :: Experimental ::
* Construct an RDD representing the database table accessible via JDBC URL
Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,33 @@ private[sql] trait DataFrameSpecificApi {
@Experimental
def saveAsTable(tableName: String): Unit

@Experimental
def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit

@Experimental
def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit

@Experimental
def save(path: String): Unit

@Experimental
def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this one might be ambiguous with the other save.

u'd want to declare something like

def save(
      dataSourceName: String,
      overwrite: Boolean,
      option: (String, String),
      options: (String, String)*): Unit


@Experimental
def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit

@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.sources._


private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Expand Down Expand Up @@ -314,12 +314,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) =>
ExecutedCommand(
CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil

case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
CreateTempTableUsing(
tableName, userSpecifiedSchema, provider, opts)) :: Nil
case c: CreateTableUsing if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case c: CreateTableUsing if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) =>
val logicalPlan = sqlContext.parseSql(query)
val cmd =
CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsSelect if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) =>
val cmd =
CreateTempTableUsingAsSelect(tableName, provider, opts, query)
ExecutedCommand(cmd) :: Nil
case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case _ => Nil
}
Expand Down
Loading