Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ case class InsertIntoTable(
}
}

case class InsertIntoCreatedTable(
case class CreateTableAsSelect(
databaseName: Option[String],
tableName: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
override lazy val resolved = (databaseName != None && childrenResolved)
}

case class WriteToFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
Expand Down Expand Up @@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd

/** Returns the schema as a string in the tree format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
db: Option[String],
tableName: String,
alias: Option[String]): LogicalPlan = synchronized {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
val table = client.getTable(databaseName, tblName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
Expand Down Expand Up @@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case InsertIntoCreatedTable(db, tableName, child) =>
case CreateTableAsSelect(db, tableName, child) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

createTable(databaseName, tblName, child.output)

InsertIntoTable(
lookupRelation(Some(databaseName), tblName, None),
Map.empty,
child,
overwrite = false)
CreateTableAsSelect(Some(databaseName), tableName, child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private[hive] object HiveQl {

val (db, tableName) = extractDbNameTableName(tableNameParts)

InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
CreateTableAsSelect(db, tableName, nodeToPlan(query))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.CreateTableAsSelect(database, tableName, child) =>
val query = planLater(child)
CreateTableAsSelect(
database.get,
tableName,
query,
InsertIntoHiveTable(_: MetastoreRelation,
Map(),
query,
true)(hiveContext)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation

/**
* :: Experimental ::
* Create table and insert the query result into it.
* @param database the database name of the new relation
* @param tableName the table name of the new relation
* @param insertIntoRelation function of creating the `InsertIntoHiveTable`
* by specifying the `MetaStoreRelation`, the data will be inserted into that table.
* TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc.
*/
@Experimental
case class CreateTableAsSelect(
database: String,
tableName: String,
query: SparkPlan,
insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
extends LeafNode with Command {

def output = Seq.empty

// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
// Create the table
val sc = sqlContext.asInstanceOf[HiveContext]
sc.catalog.createTable(database, tableName, query.output, false)
// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
case LowerCaseSchema(r: MetastoreRelation) => r
case o: MetastoreRelation => o
}
}

override protected[sql] lazy val sideEffectResult: Seq[Row] = {
insertIntoRelation(metastoreRelation).execute
Seq.empty[Row]
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
}

override def argString: String = {
s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
(@transient sc: HiveContext)
extends UnaryNode {

val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private val hiveContext = new Context(sc.hiveconf)
@transient private val db = Hive.get(sc.hiveconf)
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.QueryTest

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.test.TestHive._

case class Nested1(f1: Nested2)
Expand Down Expand Up @@ -54,4 +56,11 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT f1.f2.f3 FROM nested"),
1)
}

test("test CTAS") {
checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
checkAnswer(
sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
}
}