From 2f913549174a5f549089a949500f29525e9ccbde Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 28 Jan 2015 16:54:49 -0800 Subject: [PATCH 01/16] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser. --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 10 +++++++--- .../apache/spark/sql/parquet/ParquetQuerySuite.scala | 12 +++++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 24a65f8f4d37..eea4b72497cd 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -146,9 +146,13 @@ class SqlParser extends AbstractSparkSQLParser { } protected lazy val insert: Parser[LogicalPlan] = - INSERT ~> OVERWRITE.? ~ (INTO ~> relation) ~ select ^^ { - case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o.isDefined) - } + ( INSERT ~> OVERWRITE ~> TABLE ~> relation ~ select ^^ { + case r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, true) + } + | INSERT ~> INTO ~> TABLE ~> relation ~ select ^^ { + case r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, false) + } + ) protected lazy val projection: Parser[Expression] = expression ~ (AS.? ~> ident.?) ^^ { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 3d82f4bce777..5ec7a156d935 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -37,11 +37,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { test("appending") { val data = (0 until 10).map(i => (i, i.toString)) withParquetTable(data, "t") { - sql("INSERT INTO t SELECT * FROM t") + sql("INSERT INTO TABLE t SELECT * FROM t") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } } + // This test case will trigger the NPE mentioned in + // https://issues.apache.org/jira/browse/PARQUET-151. + ignore("overwriting") { + val data = (0 until 10).map(i => (i, i.toString)) + withParquetTable(data, "t") { + sql("INSERT OVERWRITE TABLE t SELECT * FROM t") + checkAnswer(table("t"), data.map(Row.fromTuple)) + } + } + test("self-join") { // 4 rows, cells of column 1 of row 2 and row 4 are null val data = (1 to 4).map { i => From cb85b054753ae5f82b94a4656d302166776d5c63 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 30 Jan 2015 12:46:45 -0800 Subject: [PATCH 02/16] Initial write support. --- .../apache/spark/sql/json/JSONRelation.scala | 41 +++++++-- .../sql/sources/DataSourceStrategy.scala | 9 +- .../apache/spark/sql/sources/commands.scala | 35 +++++++ .../apache/spark/sql/sources/interfaces.scala | 15 ++- .../spark/sql/sources/InsertIntoSuite.scala | 91 +++++++++++++++++++ 5 files changed, 177 insertions(+), 14 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 1af96c28d5fd..6de9abc1af34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -17,7 +17,11 @@ package org.apache.spark.sql.json -import org.apache.spark.sql.SQLContext +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -28,10 +32,10 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio, None)(sqlContext) + JSONRelation(path, samplingRatio, None)(sqlContext) } /** Returns a new base relation with the given schema and parameters. */ @@ -39,21 +43,22 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { - val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext) + JSONRelation(path, samplingRatio, Some(schema))(sqlContext) } } private[sql] case class JSONRelation( - fileName: String, + path: String, samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) - extends TableScan { + extends TableScan with InsertableRelation { - private def baseRDD = sqlContext.sparkContext.textFile(fileName) + // TODO: Support partitioned JSON relation. + private def baseRDD = sqlContext.sparkContext.textFile(path) override val schema = userSpecifiedSchema.getOrElse( JsonRDD.nullTypeToStringType( @@ -64,4 +69,24 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) + + override def insertInto(data: DataFrame, overwrite: Boolean) = { + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + + if (overwrite) { + try { + fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + } + data.toJSON.saveAsTextFile(path) + } else { + // TODO: Support INSERT INTO + sys.error("JSON table only support INSERT OVERWRITE for now.") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index d13f2ce2a5e1..d40d9b693aa3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.{Row, Strategy} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, InsertIntoTable => LogicalInsertIntoTable} import org.apache.spark.sql.execution /** @@ -54,6 +54,13 @@ private[sql] object DataSourceStrategy extends Strategy { case l @ LogicalRelation(t: TableScan) => execution.PhysicalRDD(l.output, t.buildScan()) :: Nil + case i @ LogicalInsertIntoTable( + l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => + if (partition.nonEmpty) { + sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") + } + execution.ExecutedCommand(InsertIntoTable(t, query, overwrite)) :: Nil + case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala new file mode 100644 index 000000000000..51c9d30f799f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.RunnableCommand + +private[sql] case class InsertIntoTable( + relation: InsertableRelation, + query: LogicalPlan, + overwrite: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + relation.insertInto(new DataFrame(sqlContext, query), overwrite) + + Seq.empty[Row] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index cd82cc6ecb61..518aa1f69ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} import org.apache.spark.sql.types.StructType @@ -108,7 +108,7 @@ abstract class BaseRelation { * A BaseRelation that can produce all of its tuples as an RDD of Row objects. */ @DeveloperApi -abstract class TableScan extends BaseRelation { +trait TableScan extends BaseRelation { def buildScan(): RDD[Row] } @@ -118,7 +118,7 @@ abstract class TableScan extends BaseRelation { * containing all of its tuples as Row objects. */ @DeveloperApi -abstract class PrunedScan extends BaseRelation { +trait PrunedScan extends BaseRelation { def buildScan(requiredColumns: Array[String]): RDD[Row] } @@ -132,7 +132,7 @@ abstract class PrunedScan extends BaseRelation { * as filtering partitions based on a bloom filter. */ @DeveloperApi -abstract class PrunedFilteredScan extends BaseRelation { +trait PrunedFilteredScan extends BaseRelation { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] } @@ -145,6 +145,11 @@ abstract class PrunedFilteredScan extends BaseRelation { * for experimentation. */ @Experimental -abstract class CatalystScan extends BaseRelation { +trait CatalystScan extends BaseRelation { def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] } + +@DeveloperApi +trait InsertableRelation extends BaseRelation { + def insertInto(data: DataFrame, overwrite: Boolean): Unit +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala new file mode 100644 index 000000000000..bd2265f39422 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util +import org.apache.spark.util.Utils + + +class InsertIntoSuite extends DataSourceTest { + + import caseInsensisitiveContext._ + + var path: File = util.getTempFilePath("jsonInsertInto").getCanonicalFile + + before { + jsonRDD(sparkContext.parallelize("""{"a":1, "b": "str"}""" :: Nil)).registerTempTable("jt") + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable (a int, b string) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) + """.stripMargin) + } + + after { + if (path.exists()) Utils.deleteRecursively(path) + } + + test("Simple INSERT OVERWRITE") { + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + Row(1, "str") + ) + } + + test("INSERT OVERWRITE multiple times") { + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + sql( + s""" + |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + Row(1, "str") + ) + } + + test("INSERT INTO not supported for JSONRelation for now") { + intercept[RuntimeException]{ + sql( + s""" + |INSERT INTO TABLE jsonTable SELECT a, b FROM jt + """.stripMargin) + } + } +} From 788089162bbbde27511cf7aced92b0954dca01e4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 30 Jan 2015 18:30:01 -0800 Subject: [PATCH 03/16] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause. --- .../spark/sql/execution/SparkStrategies.scala | 14 +- .../org/apache/spark/sql/sources/ddl.scala | 101 +++++++++++--- .../apache/spark/sql/sources/interfaces.scala | 2 +- .../sources/CreateTableAsSelectSuite.scala | 123 ++++++++++++++++++ .../spark/sql/sources/InsertIntoSuite.scala | 14 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 9 +- .../spark/sql/hive/HiveStrategies.scala | 12 +- .../spark/sql/hive/execution/commands.scala | 39 +++++- .../sql/hive/MetastoreDataSourcesSuite.scala | 94 ++++++++++++- 9 files changed, 373 insertions(+), 35 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0cc9d049c964..31f0134210ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.parquet._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing} +import org.apache.spark.sql.sources.{CreateTempTableUsingAsSelect, CreateTableUsingAsSelect, CreateTempTableUsing, CreateTableUsing} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { @@ -314,11 +314,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, allowExisting) => ExecutedCommand( - CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil + CreateTempTableUsing( + tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil + case CreateTableUsing(_, _, _, false, _, _) => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + case CreateTableUsingAsSelect(tableName, provider, true, options, allowExisting, query) => + ExecutedCommand( + CreateTempTableUsingAsSelect(tableName, provider, options, allowExisting, query)) :: Nil + case CreateTableUsingAsSelect(_, _, false, _, _, _) => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case _ => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index b4af91a768ef..c6a5339baf80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -37,6 +37,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { try { Some(apply(input)) } catch { + case ddlException: DDLException => throw ddlException case _ if !exceptionOnError => None case x: Throwable => throw x } @@ -46,8 +47,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { lexical.initialize(reservedWords) phrase(dataType)(new lexical.Scanner(input)) match { case Success(r, x) => r - case x => - sys.error(s"Unsupported dataType: $x") + case x => throw new DDLException(s"Unsupported dataType: $x") } } @@ -57,8 +57,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val CREATE = Keyword("CREATE") protected val TEMPORARY = Keyword("TEMPORARY") protected val TABLE = Keyword("TABLE") + protected val IF = Keyword("IF") + protected val NOT = Keyword("NOT") + protected val EXISTS = Keyword("EXISTS") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val AS = Keyword("AS") // Data types. protected val STRING = Keyword("STRING") @@ -83,22 +87,46 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected def start: Parser[LogicalPlan] = ddl /** - * `CREATE [TEMPORARY] TABLE avroTable + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` * or - * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) + * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS] * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = ( - (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident - ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case temp ~ tableName ~ columns ~ provider ~ opts => - val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts) - } + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident + ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ~ (AS ~> restInput).? ^^ { + case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => + if (query.isDefined) { + if (columns.isDefined) { + throw new DDLException( + "a CREATE TABLE AS SELECT statement does not allow column definitions.") + } + CreateTableUsingAsSelect(tableName, + provider, + temp.isDefined, + opts, + allowExisting.isDefined, + query.get) + } else { + val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) + CreateTableUsing( + tableName, + userSpecifiedSchema, + provider, + temp.isDefined, + opts, + allowExisting.isDefined) + } + } ) protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" @@ -215,18 +243,55 @@ private[sql] case class CreateTableUsing( userSpecifiedSchema: Option[StructType], provider: String, temporary: Boolean, - options: Map[String, String]) extends Command + options: Map[String, String], + allowExisting: Boolean) extends Command + +private[sql] case class CreateTableUsingAsSelect( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String], + allowExisting: Boolean, + query: String) extends Command private [sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) extends RunnableCommand { + options: Map[String, String], + allowExisting: Boolean) extends RunnableCommand { def run(sqlContext: SQLContext) = { - val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) - sqlContext.registerRDDAsTable( - new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + if (!sqlContext.catalog.tableExists(Seq(tableName))) { + val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) + sqlContext.registerRDDAsTable( + new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + } + + Seq.empty + } +} + +private [sql] case class CreateTempTableUsingAsSelect( + tableName: String, + provider: String, + options: Map[String, String], + allowExisting: Boolean, + query: String) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + // The semantic of allowExisting for CREATE TEMPORARY TABLE is if allowExisting is true + // and the table already exists, we will do nothing. If allowExisting is false, + // and the table already exists, we will overwrite it. + val alreadyExists = sqlContext.catalog.tableExists(Seq(tableName)) + if (!(alreadyExists && allowExisting)) { + val df = sqlContext.sql(query) + val resolved = ResolvedDataSource(sqlContext, Some(df.schema), provider, options) + sqlContext.registerRDDAsTable( + new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + df.insertInto(tableName, true) + } + Seq.empty } } @@ -248,3 +313,9 @@ protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, override def -(key: String): Map[String, String] = baseMap - key.toLowerCase() } + +/** + * The exception thrown from the DDL parser. + * @param message + */ +protected[sql] class DDLException(message: String) extends Exception(message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 518aa1f69ca3..dfadaec95c4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -152,4 +152,4 @@ trait CatalystScan extends BaseRelation { @DeveloperApi trait InsertableRelation extends BaseRelation { def insertInto(data: DataFrame, overwrite: Boolean): Unit -} \ No newline at end of file +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala new file mode 100644 index 000000000000..6f5338b8bd22 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.apache.spark.sql.catalyst.util +import org.apache.spark.sql.types.{StringType, StructType, StructField} +import org.apache.spark.util.Utils + + +class CreateTableAsSelectSuite extends DataSourceTest { + + import caseInsensisitiveContext._ + + var path: File = util.getTempFilePath("jsonCTAS").getCanonicalFile + + before { + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + } + + after { + dropTempTable("jt") + if (path.exists()) Utils.deleteRecursively(path) + } + + test("CTAS") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable") + } + + test("CTAS with IF NOT EXISTS") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT b FROM jt + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + // jsonTable should only have a single column. + val expectedSchema = StructType(StructField("b", StringType, true) :: Nil) + assert(expectedSchema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT b FROM jt").collect()) + + // This statement does not have IF NOT EXISTS, so we will overwrite the table. + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + assert(table("jt").schema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable") + } + + test("a CTAS statement with column definitions is not allowed") { + intercept[DDLException]{ + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable (a int, b string) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala index bd2265f39422..4617b4301b51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala @@ -29,9 +29,9 @@ class InsertIntoSuite extends DataSourceTest { import caseInsensisitiveContext._ var path: File = util.getTempFilePath("jsonInsertInto").getCanonicalFile - before { - jsonRDD(sparkContext.parallelize("""{"a":1, "b": "str"}""" :: Nil)).registerTempTable("jt") + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") sql( s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) @@ -43,10 +43,12 @@ class InsertIntoSuite extends DataSourceTest { } after { + dropTempTable("jsonTable") + dropTempTable("jt") if (path.exists()) Utils.deleteRecursively(path) } - test("Simple INSERT OVERWRITE") { + test("Simple INSERT OVERWRITE a JSONRelation") { sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt @@ -54,11 +56,11 @@ class InsertIntoSuite extends DataSourceTest { checkAnswer( sql("SELECT a, b FROM jsonTable"), - Row(1, "str") + (1 to 10).map(i => Row(i, s"str$i")) ) } - test("INSERT OVERWRITE multiple times") { + test("INSERT OVERWRITE a JSONRelation multiple times") { sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt @@ -76,7 +78,7 @@ class InsertIntoSuite extends DataSourceTest { checkAnswer( sql("SELECT a, b FROM jsonTable"), - Row(1, "str") + (1 to 10).map(i => Row(i, s"str$i")) ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1a49f09bd998..151ad34b3b73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -103,7 +103,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) = { + options: Map[String, String], + allowExisting: Boolean) = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) @@ -118,7 +119,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // create the table synchronized { - client.createTable(tbl, false) + try client.createTable(tbl, allowExisting) catch { + case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException + if allowExisting => // Do nothing + case e: Throwable => throw e + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ace9329cd582..4c911db7379d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.CreateTableUsing +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -211,9 +211,15 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) => ExecutedCommand( - CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil + CreateMetastoreDataSource( + tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil + + case CreateTableUsingAsSelect(tableName, provider, false, options, allowExisting, query) => + ExecutedCommand( + CreateMetastoreDataSourceAsSelect( + tableName, provider, options, allowExisting, query)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4814cb7ebfe5..88251b1ac870 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -102,11 +102,46 @@ case class CreateMetastoreDataSource( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String]) extends RunnableCommand { + options: Map[String, String], + allowExisting: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) + hiveContext.catalog.createDataSourceTable( + tableName, + userSpecifiedSchema, + provider, + options, + allowExisting) + + Seq.empty[Row] + } +} + +case class CreateMetastoreDataSourceAsSelect( + tableName: String, + provider: String, + options: Map[String, String], + allowExisting: Boolean, + query: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + + // The semantic of allowExisting for CREATE TABLE is if allowExisting is true + // and the table already exists, we will do nothing. If allowExisting is false, + // and the table already exists, an exception will be thrown. + val alreadyExists = hiveContext.catalog.tableExists(Seq(tableName)) + if (!(alreadyExists && allowExisting)) { + val df = hiveContext.sql(query) + hiveContext.catalog.createDataSourceTable( + tableName, + Some(df.schema), + provider, + options, + allowExisting) + df.insertInto(tableName, true) + } Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7408c7ffd69e..d1fb81720eea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.hive import java.io.File +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException +import org.apache.hadoop.hive.ql.metadata.HiveException import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils +import org.apache.spark.sql.catalyst.util import org.apache.spark.sql._ import org.apache.spark.util.Utils import org.apache.spark.sql.types._ @@ -39,6 +42,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { } val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile test ("persistent JSON table") { sql( @@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructField("", innerStruct, true) :: StructField("b", StringType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) jsonFile(filePath).registerTempTable("expectedJsonTable") @@ -240,7 +244,93 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { invalidateTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) - assert(expectedSchema == table("jsonTable").schema) + assert(expectedSchema === table("jsonTable").schema) + } + + test("CTAS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) + } + + test("CTAS with IF NOT EXISTS") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + // Create the table again should trigger a AlreadyExistsException. + val exception = intercept[HiveException] { + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + } + assert(exception.getMessage.contains("AlreadyExistsException"), + "Hive should complain that ctasJsonTable already exists") + + // The following statement should be fine if it has IF NOT EXISTS. + // It tries to create a table ctasJsonTable with a new schema. + // The actual table's schema and data should not be changed. + sql( + s""" + |CREATE TABLE IF NOT EXISTS ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${ctasPath}' + |) AS + |SELECT a FROM jsonTable + """.stripMargin) + + // Discard the cached relation. + invalidateTable("ctasJsonTable") + + // Schema should not be changed. + assert(table("ctasJsonTable").schema === table("jsonTable").schema) + // Table data should not be changed. + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM jsonTable").collect()) } test("SPARK-5286 Fail to drop an invalid table when using the data source API") { From fd6758c52ba5a1f5fe56ea802084699f21164a00 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 31 Jan 2015 12:43:13 -0800 Subject: [PATCH 04/16] Use BeforeAndAfterAll. --- .../sources/CreateTableAsSelectSuite.scala | 49 +++++++++++++++++-- .../spark/sql/sources/InsertIntoSuite.scala | 13 +++-- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 6f5338b8bd22..d3d4dbb1cd24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -19,24 +19,29 @@ package org.apache.spark.sql.sources import java.io.File +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.sql.catalyst.util import org.apache.spark.sql.types.{StringType, StructType, StructField} import org.apache.spark.util.Utils - -class CreateTableAsSelectSuite extends DataSourceTest { +class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { import caseInsensisitiveContext._ - var path: File = util.getTempFilePath("jsonCTAS").getCanonicalFile + var path: File = null - before { + override def beforeAll(): Unit = { + path = util.getTempFilePath("jsonCTAS").getCanonicalFile val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) jsonRDD(rdd).registerTempTable("jt") } - after { + override def afterAll(): Unit = { dropTempTable("jt") + } + + after { if (path.exists()) Utils.deleteRecursively(path) } @@ -107,6 +112,40 @@ class CreateTableAsSelectSuite extends DataSourceTest { dropTempTable("jsonTable") } + test("create a table, drop it and create another one with the same name") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable") + + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a * 4 FROM jt + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT a * 4 FROM jt").collect()) + + dropTempTable("jsonTable") + } + test("a CTAS statement with column definitions is not allowed") { intercept[DDLException]{ sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala index 4617b4301b51..f91cea6a3706 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala @@ -19,17 +19,20 @@ package org.apache.spark.sql.sources import java.io.File +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util import org.apache.spark.util.Utils - -class InsertIntoSuite extends DataSourceTest { +class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll { import caseInsensisitiveContext._ - var path: File = util.getTempFilePath("jsonInsertInto").getCanonicalFile - before { + var path: File = null + + override def beforeAll: Unit = { + path = util.getTempFilePath("jsonCTAS").getCanonicalFile val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) jsonRDD(rdd).registerTempTable("jt") sql( @@ -42,7 +45,7 @@ class InsertIntoSuite extends DataSourceTest { """.stripMargin) } - after { + override def afterAll: Unit = { dropTempTable("jsonTable") dropTempTable("jt") if (path.exists()) Utils.deleteRecursively(path) From d37b19c3d1891d077205772c97534fdff04830a1 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 31 Jan 2015 12:45:32 -0800 Subject: [PATCH 05/16] Cheng's comments. --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 10 +++------- .../apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index eea4b72497cd..234848a59894 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -146,13 +146,9 @@ class SqlParser extends AbstractSparkSQLParser { } protected lazy val insert: Parser[LogicalPlan] = - ( INSERT ~> OVERWRITE ~> TABLE ~> relation ~ select ^^ { - case r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, true) - } - | INSERT ~> INTO ~> TABLE ~> relation ~ select ^^ { - case r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, false) - } - ) + INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ { + case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o) + } protected lazy val projection: Parser[Expression] = expression ~ (AS.? ~> ident.?) ^^ { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 31f0134210ba..2dc69d432401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -318,13 +318,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommand( CreateTempTableUsing( tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil - case CreateTableUsing(_, _, _, false, _, _) => + case c: CreateTableUsing if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case CreateTableUsingAsSelect(tableName, provider, true, options, allowExisting, query) => ExecutedCommand( CreateTempTableUsingAsSelect(tableName, provider, options, allowExisting, query)) :: Nil - case CreateTableUsingAsSelect(_, _, false, _, _, _) => + case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case _ => Nil From 95a7c716b8c6864fb4e0c73ec01df854b8475e9f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 31 Jan 2015 13:25:01 -0800 Subject: [PATCH 06/16] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement. --- .../src/main/scala/org/apache/spark/sql/sources/ddl.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index c6a5339baf80..56140a76c273 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -262,7 +262,11 @@ private [sql] case class CreateTempTableUsing( allowExisting: Boolean) extends RunnableCommand { def run(sqlContext: SQLContext) = { - if (!sqlContext.catalog.tableExists(Seq(tableName))) { + // The semantic of allowExisting for CREATE TEMPORARY TABLE is if allowExisting is true + // and the table already exists, we will do nothing. If allowExisting is false, + // and the table already exists, we will overwrite it. + val alreadyExists = sqlContext.catalog.tableExists(Seq(tableName)) + if (!(alreadyExists && allowExisting)) { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) sqlContext.registerRDDAsTable( new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) From 909924fbe1731785b4eae91d505dc670417cd28d Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 31 Jan 2015 16:19:45 -0800 Subject: [PATCH 07/16] Add saveAsTable for the data source API to DataFrame. --- .../org/apache/spark/sql/DataFrame.scala | 47 +++++++++++++++++++ .../main/scala/org/apache/spark/sql/api.scala | 9 ++++ .../spark/sql/execution/SparkStrategies.scala | 17 +++++-- .../org/apache/spark/sql/sources/ddl.scala | 12 ++++- .../spark/sql/hive/HiveStrategies.scala | 16 +++++-- .../spark/sql/hive/execution/commands.scala | 7 +-- 6 files changed, 94 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 1096e396591d..4ce028247605 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.{List => JList} +import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect} + import scala.language.implicitConversions import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -622,6 +624,51 @@ class DataFrame protected[sql]( CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd } + /** + * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a given data source and + * a set of options. This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: Map[String, String]): Unit = { + val cmd = + CreateTableUsingAsLogicalPlan( + tableName, + dataSourceName, + temporary = false, + options, + allowExisting = false, + logicalPlan) + + sqlContext.executePlan(cmd).toRdd + } + + /** + * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a given data source and + * a set of options. This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { + saveAsTable(tableName, dataSourceName, options.toMap) + } + /** * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index eb0eb3f32560..93ca21347f6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -170,6 +170,15 @@ private[sql] trait DataFrameSpecificApi { @Experimental def saveAsTable(tableName: String): Unit + @Experimental + def saveAsTable(tableName: String, dataSourceName: String, options: Map[String, String]): Unit + + @Experimental + def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit + @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2dc69d432401..3397bf20055d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.parquet._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.sources.{CreateTempTableUsingAsSelect, CreateTableUsingAsSelect, CreateTempTableUsing, CreateTableUsing} +import org.apache.spark.sql.sources._ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { @@ -321,12 +321,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case c: CreateTableUsing if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - case CreateTableUsingAsSelect(tableName, provider, true, options, allowExisting, query) => - ExecutedCommand( - CreateTempTableUsingAsSelect(tableName, provider, options, allowExisting, query)) :: Nil + case CreateTableUsingAsSelect(tableName, provider, true, opts, allowExisting, query) => + val logicalPlan = sqlContext.parseSql(query) + val cmd = + CreateTempTableUsingAsSelect(tableName, provider, opts, allowExisting, logicalPlan) + ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, allowExisting, query) => + val cmd = + CreateTempTableUsingAsSelect(tableName, provider, opts, allowExisting, query) + ExecutedCommand(cmd) :: Nil + case c: CreateTableUsingAsLogicalPlan if !c.temporary => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 56140a76c273..e1924dfe8983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -254,6 +254,14 @@ private[sql] case class CreateTableUsingAsSelect( allowExisting: Boolean, query: String) extends Command +private[sql] case class CreateTableUsingAsLogicalPlan( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String], + allowExisting: Boolean, + query: LogicalPlan) extends Command + private [sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], @@ -281,7 +289,7 @@ private [sql] case class CreateTempTableUsingAsSelect( provider: String, options: Map[String, String], allowExisting: Boolean, - query: String) extends RunnableCommand { + query: LogicalPlan) extends RunnableCommand { def run(sqlContext: SQLContext) = { // The semantic of allowExisting for CREATE TEMPORARY TABLE is if allowExisting is true @@ -289,7 +297,7 @@ private [sql] case class CreateTempTableUsingAsSelect( // and the table already exists, we will overwrite it. val alreadyExists = sqlContext.catalog.tableExists(Seq(tableName)) if (!(alreadyExists && allowExisting)) { - val df = sqlContext.sql(query) + val df = new DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, Some(df.schema), provider, options) sqlContext.registerRDDAsTable( new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 4c911db7379d..f1224f364099 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing} +import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -216,10 +216,16 @@ private[hive] trait HiveStrategies { CreateMetastoreDataSource( tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil - case CreateTableUsingAsSelect(tableName, provider, false, options, allowExisting, query) => - ExecutedCommand( - CreateMetastoreDataSourceAsSelect( - tableName, provider, options, allowExisting, query)) :: Nil + case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) => + val logicalPlan = hiveContext.parseSql(query) + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan) + ExecutedCommand(cmd) :: Nil + + case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) => + val cmd = + CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query) + ExecutedCommand(cmd) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 88251b1ac870..9850a8e9f883 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StructType @@ -123,7 +124,7 @@ case class CreateMetastoreDataSourceAsSelect( provider: String, options: Map[String, String], allowExisting: Boolean, - query: String) extends RunnableCommand { + query: LogicalPlan) extends RunnableCommand { override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] @@ -133,7 +134,7 @@ case class CreateMetastoreDataSourceAsSelect( // and the table already exists, an exception will be thrown. val alreadyExists = hiveContext.catalog.tableExists(Seq(tableName)) if (!(alreadyExists && allowExisting)) { - val df = hiveContext.sql(query) + val df = new DataFrame(hiveContext, query) hiveContext.catalog.createDataSourceTable( tableName, Some(df.schema), From 1a719a54edbf891f80ea2283ac17a8dd5482e81e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 31 Jan 2015 17:25:08 -0800 Subject: [PATCH 08/16] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now. --- .../spark/sql/execution/SparkStrategies.scala | 12 ++-- .../org/apache/spark/sql/sources/ddl.scala | 38 ++++------- .../sources/CreateTableAsSelectSuite.scala | 64 +++++-------------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 30 ++++++--- .../spark/sql/hive/execution/commands.scala | 12 +--- .../sql/hive/MetastoreDataSourcesSuite.scala | 2 +- 6 files changed, 62 insertions(+), 96 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3397bf20055d..054564c50722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -314,24 +314,24 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, allowExisting) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) => ExecutedCommand( CreateTempTableUsing( - tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil + tableName, userSpecifiedSchema, provider, opts)) :: Nil case c: CreateTableUsing if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - case CreateTableUsingAsSelect(tableName, provider, true, opts, allowExisting, query) => + case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) => val logicalPlan = sqlContext.parseSql(query) val cmd = - CreateTempTableUsingAsSelect(tableName, provider, opts, allowExisting, logicalPlan) + CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan) ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, allowExisting, query) => + case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) => val cmd = - CreateTempTableUsingAsSelect(tableName, provider, opts, allowExisting, query) + CreateTempTableUsingAsSelect(tableName, provider, opts, query) ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsLogicalPlan if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index e1924dfe8983..ffd7921a29a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -105,6 +105,11 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ~ (AS ~> restInput).? ^^ { case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => + if (temp.isDefined && allowExisting.isDefined) { + throw new DDLException( + "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") + } + if (query.isDefined) { if (columns.isDefined) { throw new DDLException( @@ -266,20 +271,12 @@ private [sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - options: Map[String, String], - allowExisting: Boolean) extends RunnableCommand { + options: Map[String, String]) extends RunnableCommand { def run(sqlContext: SQLContext) = { - // The semantic of allowExisting for CREATE TEMPORARY TABLE is if allowExisting is true - // and the table already exists, we will do nothing. If allowExisting is false, - // and the table already exists, we will overwrite it. - val alreadyExists = sqlContext.catalog.tableExists(Seq(tableName)) - if (!(alreadyExists && allowExisting)) { - val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) - sqlContext.registerRDDAsTable( - new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) - } - + val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) + sqlContext.registerRDDAsTable( + new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) Seq.empty } } @@ -288,21 +285,14 @@ private [sql] case class CreateTempTableUsingAsSelect( tableName: String, provider: String, options: Map[String, String], - allowExisting: Boolean, query: LogicalPlan) extends RunnableCommand { def run(sqlContext: SQLContext) = { - // The semantic of allowExisting for CREATE TEMPORARY TABLE is if allowExisting is true - // and the table already exists, we will do nothing. If allowExisting is false, - // and the table already exists, we will overwrite it. - val alreadyExists = sqlContext.catalog.tableExists(Seq(tableName)) - if (!(alreadyExists && allowExisting)) { - val df = new DataFrame(sqlContext, query) - val resolved = ResolvedDataSource(sqlContext, Some(df.schema), provider, options) - sqlContext.registerRDDAsTable( - new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) - df.insertInto(tableName, true) - } + val df = new DataFrame(sqlContext, query) + val resolved = ResolvedDataSource(sqlContext, Some(df.schema), provider, options) + sqlContext.registerRDDAsTable( + new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + df.insertInto(tableName, true) Seq.empty } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index d3d4dbb1cd24..56ac8a8320bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -45,7 +45,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { if (path.exists()) Utils.deleteRecursively(path) } - test("CTAS") { + test("CREATE TEMPORARY TABLE AS SELECT") { sql( s""" |CREATE TEMPORARY TABLE jsonTable @@ -63,7 +63,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { dropTempTable("jsonTable") } - test("CTAS with IF NOT EXISTS") { + test("create a table, drop it and create another one with the same name") { sql( s""" |CREATE TEMPORARY TABLE jsonTable @@ -71,48 +71,15 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { |OPTIONS ( | path '${path.toString}' |) AS - |SELECT b FROM jt - """.stripMargin) - - sql( - s""" - |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${path.toString}' - |) AS |SELECT a, b FROM jt """.stripMargin) - // jsonTable should only have a single column. - val expectedSchema = StructType(StructField("b", StringType, true) :: Nil) - assert(expectedSchema === table("jsonTable").schema) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - sql("SELECT b FROM jt").collect()) - - // This statement does not have IF NOT EXISTS, so we will overwrite the table. - sql( - s""" - |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource - |OPTIONS ( - | path '${path.toString}' - |) AS - |SELECT a, b FROM jt - """.stripMargin) - - assert(table("jt").schema === table("jsonTable").schema) - checkAnswer( sql("SELECT a, b FROM jsonTable"), sql("SELECT a, b FROM jt").collect()) dropTempTable("jsonTable") - } - test("create a table, drop it and create another one with the same name") { sql( s""" |CREATE TEMPORARY TABLE jsonTable @@ -120,30 +87,31 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { |OPTIONS ( | path '${path.toString}' |) AS - |SELECT a, b FROM jt + |SELECT a * 4 FROM jt """.stripMargin) checkAnswer( - sql("SELECT a, b FROM jsonTable"), - sql("SELECT a, b FROM jt").collect()) + sql("SELECT * FROM jsonTable"), + sql("SELECT a * 4 FROM jt").collect()) dropTempTable("jsonTable") + } - sql( - s""" - |CREATE TEMPORARY TABLE jsonTable + test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") { + val message = intercept[DDLException]{ + sql( + s""" + |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toString}' |) AS - |SELECT a * 4 FROM jt + |SELECT b FROM jt """.stripMargin) - - checkAnswer( - sql("SELECT * FROM jsonTable"), - sql("SELECT a * 4 FROM jt").collect()) - - dropTempTable("jsonTable") + }.getMessage + assert( + message.contains("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."), + "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.") } test("a CTAS statement with column definitions is not allowed") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 151ad34b3b73..7b8e369517b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -24,9 +24,8 @@ import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema} -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} -import org.apache.hadoop.hive.ql.metadata.InvalidTableException +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema} +import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} @@ -99,12 +98,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false + /** * + * Creates a data source table (a table created with USING clause) in Hive's metastore. + * Returns true when the table has been created. Otherwise, false. + * @param tableName + * @param userSpecifiedSchema + * @param provider + * @param options + * @param allowExisting + * @return + */ def createDataSourceTable( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], - allowExisting: Boolean) = { + allowExisting: Boolean): Boolean = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) @@ -119,12 +128,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // create the table synchronized { - try client.createTable(tbl, allowExisting) catch { - case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException - if allowExisting => // Do nothing - case e: Throwable => throw e + // Always set ifNotExists to false for createTable since we need to + // know if the table has actually been created in Hive's metastore. + try { + client.createTable(tbl, false) + } catch { + case e: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] && allowExisting => + return false } } + + return true } def tableExists(tableIdentifier: Seq[String]): Boolean = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 9850a8e9f883..2011513515fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -128,21 +128,15 @@ case class CreateMetastoreDataSourceAsSelect( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - - // The semantic of allowExisting for CREATE TABLE is if allowExisting is true - // and the table already exists, we will do nothing. If allowExisting is false, - // and the table already exists, an exception will be thrown. - val alreadyExists = hiveContext.catalog.tableExists(Seq(tableName)) - if (!(alreadyExists && allowExisting)) { - val df = new DataFrame(hiveContext, query) + val df = new DataFrame(hiveContext, query) + val hasCreated = hiveContext.catalog.createDataSourceTable( tableName, Some(df.schema), provider, options, allowExisting) - df.insertInto(tableName, true) - } + if (hasCreated) df.insertInto(tableName, true) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d1fb81720eea..18bf56d2579e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -306,7 +306,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |SELECT * FROM jsonTable """.stripMargin) } - assert(exception.getMessage.contains("AlreadyExistsException"), + assert(exception.getCause.isInstanceOf[AlreadyExistsException], "Hive should complain that ctasJsonTable already exists") // The following statement should be fine if it has IF NOT EXISTS. From e5d29f217a4e9e4c33c822b4436adcfc2365346c Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 1 Feb 2015 20:32:35 -0800 Subject: [PATCH 09/16] Programmatic APIs. --- .../org/apache/spark/sql/DataFrame.scala | 23 ++++++- .../org/apache/spark/sql/SQLContext.scala | 68 ++++++++++++++++++- .../main/scala/org/apache/spark/sql/api.scala | 12 ++++ .../spark/sql/execution/SparkStrategies.scala | 6 ++ .../sql/sources/DataSourceStrategy.scala | 2 +- .../apache/spark/sql/sources/commands.scala | 2 +- .../apache/spark/sql/hive/HiveContext.scala | 56 ++++++++++++++- 7 files changed, 163 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4ce028247605..608128b3fd78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql import java.util.{List => JList} -import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect} - import scala.language.implicitConversions import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD +import org.apache.spark.sql.sources.{InsertableRelation, ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} import org.apache.spark.util.Utils @@ -669,6 +668,26 @@ class DataFrame protected[sql]( saveAsTable(tableName, dataSourceName, options.toMap) } + @Experimental + override def save( + dataSourceName: String, + options: Map[String, String], + overwrite: Boolean): Unit = { + val resolved = ResolvedDataSource(sqlContext, Some(schema), dataSourceName, options) + resolved.relation match { + case i: InsertableRelation => i.insertInto(new DataFrame(sqlContext, logicalPlan), overwrite) + case o => sys.error(s"Data source $dataSourceName does not support save.") + } + } + + @Experimental + def save( + dataSourceName: String, + options: java.util.Map[String, String], + overwrite: Boolean): Unit = { + save(dataSourceName, options.toMap, overwrite) + } + /** * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f87fde4ed816..caf0b162c39b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,6 +21,7 @@ import java.beans.Introspector import java.util.Properties import scala.collection.immutable +import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.json._ -import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy} +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -334,6 +335,71 @@ class SQLContext(@transient val sparkContext: SparkContext) applySchema(rowRDD, appliedSchema) } + @Experimental + def load( + dataSourceName: String, + options: Map[String, String]): DataFrame = { + val resolved = ResolvedDataSource(this, None, dataSourceName, options) + new DataFrame(this, LogicalRelation(resolved.relation)) + } + + @Experimental + def load( + dataSourceName: String, + options: java.util.Map[String, String]): DataFrame = { + load(dataSourceName, options.toMap) + } + + @Experimental + def loadAsTempTable( + tableName: String, + dataSourceName: String, + options: Map[String, String]): Unit = { + val cmd = + CreateTableUsing( + tableName, + None, + dataSourceName, + temporary = true, + options, + allowExisting = false) + executePlan(cmd).toRdd + } + + @Experimental + def loadAsTempTable( + tableName: String, + dataSourceName: String, + options: Map[String, String], + schema: StructType): Unit = { + val cmd = + CreateTableUsing( + tableName, + Some(schema), + dataSourceName, + temporary = true, + options, + allowExisting = false) + executePlan(cmd).toRdd + } + + @Experimental + def loadAsTempTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { + loadAsTempTable(tableName, dataSourceName, options.toMap) + } + + @Experimental + def loadAsTempTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String], + schema: StructType): Unit = { + loadAsTempTable(tableName, dataSourceName, options.toMap, schema) + } + /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 93ca21347f6f..19dd4f7d45ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -179,6 +179,18 @@ private[sql] trait DataFrameSpecificApi { dataSourceName: String, options: java.util.Map[String, String]): Unit + @Experimental + def save( + dataSourceName: String, + options: Map[String, String], + overwrite: Boolean): Unit + + @Experimental + def save( + dataSourceName: String, + options: java.util.Map[String, String], + overwrite: Boolean): Unit + @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 054564c50722..ff0609d4b3b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -320,6 +320,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { tableName, userSpecifiedSchema, provider, opts)) :: Nil case c: CreateTableUsing if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsing if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) => val logicalPlan = sqlContext.parseSql(query) @@ -328,6 +330,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsSelect if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) => val cmd = @@ -335,6 +339,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommand(cmd) :: Nil case c: CreateTableUsingAsLogicalPlan if !c.temporary => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting => + sys.error("allowExisting should be set to false when creating a temporary table.") case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index d40d9b693aa3..386ff2452f1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy { if (partition.nonEmpty) { sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") } - execution.ExecutedCommand(InsertIntoTable(t, query, overwrite)) :: Nil + execution.ExecutedCommand(InsertIntoRelation(t, query, overwrite)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 51c9d30f799f..55e68069009a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand -private[sql] case class InsertIntoTable( +private[sql] case class InsertIntoRelation( relation: InsertableRelation, query: LogicalPlan, overwrite: Boolean) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b746942cb106..153e82fea50b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} -import org.apache.spark.sql.sources.DataSourceStrategy +import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -105,6 +105,60 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.invalidateTable("default", tableName) } + @Experimental + def loadAsTable( + tableName: String, + dataSourceName: String, + options: Map[String, String], + allowExisting: Boolean): Unit = { + val cmd = + CreateTableUsing( + tableName, + None, + dataSourceName, + temporary = false, + options, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def loadAsTable( + tableName: String, + dataSourceName: String, + options: Map[String, String], + schema: StructType, + allowExisting: Boolean): Unit = { + val cmd = + CreateTableUsing( + tableName, + Some(schema), + dataSourceName, + temporary = false, + options, + allowExisting) + executePlan(cmd).toRdd + } + + @Experimental + def loadAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String], + allowExisting: Boolean): Unit = { + loadAsTable(tableName, dataSourceName, options.toMap, allowExisting) + } + + @Experimental + def loadAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String], + schema: StructType, + allowExisting: Boolean): Unit = { + loadAsTable(tableName, dataSourceName, options.toMap, schema, allowExisting) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. From 66ebd744310fd9259977d9a11eef6fd94ff7ad85 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 09:06:42 -0800 Subject: [PATCH 10/16] Formatting. --- .../org/apache/spark/sql/DataFrame.scala | 24 ++++---- .../org/apache/spark/sql/SQLContext.scala | 58 ++----------------- .../apache/spark/sql/hive/HiveContext.scala | 40 ++++++------- 3 files changed, 36 insertions(+), 86 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 608128b3fd78..b40ccf99be89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -635,9 +635,9 @@ class DataFrame protected[sql]( */ @Experimental override def saveAsTable( - tableName: String, - dataSourceName: String, - options: Map[String, String]): Unit = { + tableName: String, + dataSourceName: String, + options: Map[String, String]): Unit = { val cmd = CreateTableUsingAsLogicalPlan( tableName, @@ -662,17 +662,17 @@ class DataFrame protected[sql]( */ @Experimental override def saveAsTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String]): Unit = { + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { saveAsTable(tableName, dataSourceName, options.toMap) } @Experimental override def save( - dataSourceName: String, - options: Map[String, String], - overwrite: Boolean): Unit = { + dataSourceName: String, + options: Map[String, String], + overwrite: Boolean): Unit = { val resolved = ResolvedDataSource(sqlContext, Some(schema), dataSourceName, options) resolved.relation match { case i: InsertableRelation => i.insertInto(new DataFrame(sqlContext, logicalPlan), overwrite) @@ -682,9 +682,9 @@ class DataFrame protected[sql]( @Experimental def save( - dataSourceName: String, - options: java.util.Map[String, String], - overwrite: Boolean): Unit = { + dataSourceName: String, + options: java.util.Map[String, String], + overwrite: Boolean): Unit = { save(dataSourceName, options.toMap, overwrite) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6bffd2966ce9..1e39a36c1cc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -337,69 +337,19 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def load( - dataSourceName: String, - options: Map[String, String]): DataFrame = { + dataSourceName: String, + options: Map[String, String]): DataFrame = { val resolved = ResolvedDataSource(this, None, dataSourceName, options) new DataFrame(this, LogicalRelation(resolved.relation)) } @Experimental def load( - dataSourceName: String, - options: java.util.Map[String, String]): DataFrame = { + dataSourceName: String, + options: java.util.Map[String, String]): DataFrame = { load(dataSourceName, options.toMap) } - @Experimental - def loadAsTempTable( - tableName: String, - dataSourceName: String, - options: Map[String, String]): Unit = { - val cmd = - CreateTableUsing( - tableName, - None, - dataSourceName, - temporary = true, - options, - allowExisting = false) - executePlan(cmd).toRdd - } - - @Experimental - def loadAsTempTable( - tableName: String, - dataSourceName: String, - options: Map[String, String], - schema: StructType): Unit = { - val cmd = - CreateTableUsing( - tableName, - Some(schema), - dataSourceName, - temporary = true, - options, - allowExisting = false) - executePlan(cmd).toRdd - } - - @Experimental - def loadAsTempTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String]): Unit = { - loadAsTempTable(tableName, dataSourceName, options.toMap) - } - - @Experimental - def loadAsTempTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String], - schema: StructType): Unit = { - loadAsTempTable(tableName, dataSourceName, options.toMap, schema) - } - /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 153e82fea50b..f18f043f71b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -107,14 +107,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @Experimental def loadAsTable( - tableName: String, - dataSourceName: String, - options: Map[String, String], - allowExisting: Boolean): Unit = { + tableName: String, + dataSourceName: String, + options: Map[String, String], + allowExisting: Boolean): Unit = { val cmd = CreateTableUsing( tableName, - None, + userSpecifiedSchema = None, dataSourceName, temporary = false, options, @@ -124,15 +124,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @Experimental def loadAsTable( - tableName: String, - dataSourceName: String, - options: Map[String, String], - schema: StructType, - allowExisting: Boolean): Unit = { + tableName: String, + dataSourceName: String, + options: Map[String, String], + schema: StructType, + allowExisting: Boolean): Unit = { val cmd = CreateTableUsing( tableName, - Some(schema), + userSpecifiedSchema = Some(schema), dataSourceName, temporary = false, options, @@ -142,20 +142,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @Experimental def loadAsTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String], - allowExisting: Boolean): Unit = { + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String], + allowExisting: Boolean): Unit = { loadAsTable(tableName, dataSourceName, options.toMap, allowExisting) } @Experimental def loadAsTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String], - schema: StructType, - allowExisting: Boolean): Unit = { + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String], + schema: StructType, + allowExisting: Boolean): Unit = { loadAsTable(tableName, dataSourceName, options.toMap, schema, allowExisting) } From 0128065d8a629180b68f0f275de6fb5797ba0289 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 10:58:13 -0800 Subject: [PATCH 11/16] Short hand versions of save and load. --- .../org/apache/spark/sql/DataFrame.scala | 17 ++++++++++++-- .../scala/org/apache/spark/sql/SQLConf.scala | 6 +++++ .../org/apache/spark/sql/SQLContext.scala | 10 +++++++++ .../main/scala/org/apache/spark/sql/api.scala | 6 +++++ .../apache/spark/sql/hive/HiveContext.scala | 22 ++++++++++++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 8 ++++++- 6 files changed, 61 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index b40ccf99be89..b1cba1a4c3e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -619,8 +619,9 @@ class DataFrame protected[sql]( */ @Experimental override def saveAsTable(tableName: String): Unit = { - sqlContext.executePlan( - CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd + val dataSourceName = sqlContext.conf.defaultDataSourceName + val options = Map("path" -> sqlContext.defaultTableFilePath(tableName)) + saveAsTable(tableName, dataSourceName, options) } /** @@ -668,6 +669,18 @@ class DataFrame protected[sql]( saveAsTable(tableName, dataSourceName, options.toMap) } + @Experimental + override def save(path: String): Unit = { + save(path, false) + } + + @Experimental + override def save(path: String, overwrite: Boolean): Unit = { + val dataSourceName = sqlContext.conf.defaultDataSourceName + val options = Map("path" -> path) + save(dataSourceName, options, overwrite) + } + @Experimental override def save( dataSourceName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 243dc997078f..561a91d2d60e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -47,6 +47,9 @@ private[spark] object SQLConf { // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" + // This is used to set the default data source + val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource" + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -155,6 +158,9 @@ private[sql] class SQLConf extends Serializable { private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt + private[spark] def defaultDataSourceName: String = + getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1e39a36c1cc8..6b15a4700b3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -162,6 +162,9 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) + /** Returns the default file path for a table. */ + protected[sql] def defaultTableFilePath(tableName: String): String = ??? + /** * Creates a DataFrame from an RDD of case classes. * @@ -335,6 +338,13 @@ class SQLContext(@transient val sparkContext: SparkContext) applySchema(rowRDD, appliedSchema) } + @Experimental + def load(path: String): DataFrame = { + val dataSourceName = conf.defaultDataSourceName + val options = Map("path" -> path) + load(dataSourceName, options) + } + @Experimental def load( dataSourceName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 19dd4f7d45ad..4b5c13a6322b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -179,6 +179,12 @@ private[sql] trait DataFrameSpecificApi { dataSourceName: String, options: java.util.Map[String, String]): Unit + @Experimental + def save(path: String): Unit + + @Experimental + def save(path: String, overwrite: Boolean): Unit + @Experimental def save( dataSourceName: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index f18f043f71b5..459fe7992775 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -55,6 +55,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } + protected[sql] override def defaultTableFilePath(tableName: String): String = { + catalog.hiveDefaultTableFilePath(tableName) + } + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -85,6 +89,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * @param allowExisting When false, an exception will be thrown if the table already exists. * @tparam A A case class that is used to describe the schema of the table to be created. */ + @Deprecated def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } @@ -106,7 +111,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @Experimental - def loadAsTable( + def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = { + val dataSourceName = conf.defaultDataSourceName + val options = Map("path" -> path) + createTable(tableName, dataSourceName, options, allowExisting) + } + + @Experimental + def createTable( tableName: String, dataSourceName: String, options: Map[String, String], @@ -123,7 +135,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @Experimental - def loadAsTable( + def createTable( tableName: String, dataSourceName: String, options: Map[String, String], @@ -141,12 +153,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } @Experimental - def loadAsTable( + def createTable( tableName: String, dataSourceName: String, options: java.util.Map[String, String], allowExisting: Boolean): Unit = { - loadAsTable(tableName, dataSourceName, options.toMap, allowExisting) + createTable(tableName, dataSourceName, options.toMap, allowExisting) } @Experimental @@ -156,7 +168,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { options: java.util.Map[String, String], schema: StructType, allowExisting: Boolean): Unit = { - loadAsTable(tableName, dataSourceName, options.toMap, schema, allowExisting) + createTable(tableName, dataSourceName, options.toMap, schema, allowExisting) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7b8e369517b5..555734d0ac5a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,7 +23,7 @@ import java.util.{List => JList} import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.hive.metastore.TableType +import org.apache.hadoop.hive.metastore.{Warehouse, TableType} import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema} import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.CreateTableDesc @@ -51,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) { @@ -141,6 +143,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with return true } + def hiveDefaultTableFilePath(tableName: String): String = { + hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString + } + def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( From e789d64d226f1123956880079593b32e64550a0f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 13:42:49 -0800 Subject: [PATCH 12/16] For the Scala API, let users to use tuples to provide options. Also, add unit tests. --- .../org/apache/spark/sql/DataFrame.scala | 24 +++-- .../org/apache/spark/sql/SQLContext.scala | 11 ++- .../main/scala/org/apache/spark/sql/api.scala | 10 +- .../sources/CreateTableAsSelectSuite.scala | 1 - .../spark/sql/sources/SaveLoadSuite.scala | 93 +++++++++++++++++++ .../apache/spark/sql/hive/HiveContext.scala | 33 ++++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 35 +++++++ 7 files changed, 168 insertions(+), 39 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index b1cba1a4c3e8..2f641e9f026f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -620,8 +620,7 @@ class DataFrame protected[sql]( @Experimental override def saveAsTable(tableName: String): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - val options = Map("path" -> sqlContext.defaultTableFilePath(tableName)) - saveAsTable(tableName, dataSourceName, options) + saveAsTable(tableName, dataSourceName, ("path", sqlContext.defaultTableFilePath(tableName))) } /** @@ -638,13 +637,13 @@ class DataFrame protected[sql]( override def saveAsTable( tableName: String, dataSourceName: String, - options: Map[String, String]): Unit = { + options: (String, String)*): Unit = { val cmd = CreateTableUsingAsLogicalPlan( tableName, dataSourceName, temporary = false, - options, + options.toMap, allowExisting = false, logicalPlan) @@ -666,7 +665,7 @@ class DataFrame protected[sql]( tableName: String, dataSourceName: String, options: java.util.Map[String, String]): Unit = { - saveAsTable(tableName, dataSourceName, options.toMap) + saveAsTable(tableName, dataSourceName, options.toSeq:_*) } @Experimental @@ -677,16 +676,15 @@ class DataFrame protected[sql]( @Experimental override def save(path: String, overwrite: Boolean): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - val options = Map("path" -> path) - save(dataSourceName, options, overwrite) + save(dataSourceName, overwrite, ("path" -> path)) } @Experimental override def save( dataSourceName: String, - options: Map[String, String], - overwrite: Boolean): Unit = { - val resolved = ResolvedDataSource(sqlContext, Some(schema), dataSourceName, options) + overwrite: Boolean, + options: (String, String)*): Unit = { + val resolved = ResolvedDataSource(sqlContext, Some(schema), dataSourceName, options.toMap) resolved.relation match { case i: InsertableRelation => i.insertInto(new DataFrame(sqlContext, logicalPlan), overwrite) case o => sys.error(s"Data source $dataSourceName does not support save.") @@ -696,9 +694,9 @@ class DataFrame protected[sql]( @Experimental def save( dataSourceName: String, - options: java.util.Map[String, String], - overwrite: Boolean): Unit = { - save(dataSourceName, options.toMap, overwrite) + overwrite: Boolean, + options: java.util.Map[String, String]): Unit = { + save(dataSourceName, overwrite, options.toSeq:_*) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6b15a4700b3a..90108359a11b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -341,15 +341,15 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def load(path: String): DataFrame = { val dataSourceName = conf.defaultDataSourceName - val options = Map("path" -> path) - load(dataSourceName, options) + load(dataSourceName, ("path", path)) } @Experimental def load( dataSourceName: String, - options: Map[String, String]): DataFrame = { - val resolved = ResolvedDataSource(this, None, dataSourceName, options) + option: (String, String), + options: (String, String)*): DataFrame = { + val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap) new DataFrame(this, LogicalRelation(resolved.relation)) } @@ -357,7 +357,8 @@ class SQLContext(@transient val sparkContext: SparkContext) def load( dataSourceName: String, options: java.util.Map[String, String]): DataFrame = { - load(dataSourceName, options.toMap) + val opts = options.toSeq + load(dataSourceName, opts.head, opts.tail:_*) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 4b5c13a6322b..3eaa1e7b1b81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -171,7 +171,7 @@ private[sql] trait DataFrameSpecificApi { def saveAsTable(tableName: String): Unit @Experimental - def saveAsTable(tableName: String, dataSourceName: String, options: Map[String, String]): Unit + def saveAsTable(tableName: String, dataSourceName: String, options: (String, String)*): Unit @Experimental def saveAsTable( @@ -188,14 +188,14 @@ private[sql] trait DataFrameSpecificApi { @Experimental def save( dataSourceName: String, - options: Map[String, String], - overwrite: Boolean): Unit + overwrite: Boolean, + options: (String, String)*): Unit @Experimental def save( dataSourceName: String, - options: java.util.Map[String, String], - overwrite: Boolean): Unit + overwrite: Boolean, + options: java.util.Map[String, String]): Unit @Experimental def insertInto(tableName: String, overwrite: Boolean): Unit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 56ac8a8320bb..ab53f41d3ef9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -22,7 +22,6 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.types.{StringType, StructType, StructField} import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala new file mode 100644 index 000000000000..ea97f387b292 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.DataFrame +import org.apache.spark.util.Utils + +import org.apache.spark.sql.catalyst.util + +class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll { + + import caseInsensisitiveContext._ + + var originalDefaultSource: String = null + + var path: File = null + + var df: DataFrame = null + + override def beforeAll(): Unit = { + originalDefaultSource = conf.defaultDataSourceName + conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json") + + path = util.getTempFilePath("datasource").getCanonicalFile + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + df = jsonRDD(rdd) + } + + override def afterAll(): Unit = { + conf.setConf("spark.sql.default.datasource", originalDefaultSource) + } + + after { + if (path.exists()) Utils.deleteRecursively(path) + } + + def checkLoad(): Unit = { + checkAnswer(load(path.toString), df.collect()) + checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect()) + } + + test("save with overwrite and load") { + df.save(path.toString, true) + + checkLoad + } + + test("save with data source and options, and load") { + df.save("org.apache.spark.sql.json", true, ("path", path.toString)) + + checkLoad + } + + test("save and save again without overwrite") { + df.save(path.toString, true) + + val exception = intercept[RuntimeException] { + df.save(path.toString) + } + + assert( + exception.getMessage.contains("JSON table only support INSERT OVERWRITE for now."), + "JSON table should only support INSERT OVERWRITE for now.") + } + + test("save and save again with overwrite") { + df.save(path.toString, true) + df.save(path.toString, true) + + checkLoad + } + +} \ No newline at end of file diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 459fe7992775..4f59a2ccecb8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -113,23 +113,23 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @Experimental def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = { val dataSourceName = conf.defaultDataSourceName - val options = Map("path" -> path) - createTable(tableName, dataSourceName, options, allowExisting) + createTable(tableName, dataSourceName, allowExisting, ("path", path)) } @Experimental def createTable( tableName: String, dataSourceName: String, - options: Map[String, String], - allowExisting: Boolean): Unit = { + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { val cmd = CreateTableUsing( tableName, userSpecifiedSchema = None, dataSourceName, temporary = false, - options, + (option +: options).toMap, allowExisting) executePlan(cmd).toRdd } @@ -138,16 +138,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { def createTable( tableName: String, dataSourceName: String, - options: Map[String, String], schema: StructType, - allowExisting: Boolean): Unit = { + allowExisting: Boolean, + option: (String, String), + options: (String, String)*): Unit = { val cmd = CreateTableUsing( tableName, userSpecifiedSchema = Some(schema), dataSourceName, temporary = false, - options, + (option +: options).toMap, allowExisting) executePlan(cmd).toRdd } @@ -156,19 +157,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { def createTable( tableName: String, dataSourceName: String, - options: java.util.Map[String, String], - allowExisting: Boolean): Unit = { - createTable(tableName, dataSourceName, options.toMap, allowExisting) + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*) } @Experimental - def loadAsTable( + def createTable( tableName: String, dataSourceName: String, - options: java.util.Map[String, String], schema: StructType, - allowExisting: Boolean): Unit = { - createTable(tableName, dataSourceName, options.toMap, schema, allowExisting) + allowExisting: Boolean, + options: java.util.Map[String, String]): Unit = { + val opts = options.toSeq + createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 18bf56d2579e..225f2931978e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -345,4 +345,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE jsonTable").collect().foreach(println) } + + test("save and load table") { + val originalDefaultSource = conf.defaultDataSourceName + conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val df = jsonRDD(rdd) + + df.saveAsTable("savedJsonTable") + + checkAnswer( + sql("SELECT * FROM savedJsonTable"), + df.collect()) + + createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false) + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + val exception = intercept[HiveException] { + createTable("createdJsonTable", filePath.toString, false) + } + assert(exception.getCause.isInstanceOf[AlreadyExistsException], + "Hive should complain that ctasJsonTable already exists") + + createTable("createdJsonTable", filePath.toString, true) + // createdJsonTable should be not changed. + assert(table("createdJsonTable").schema === df.schema) + checkAnswer( + sql("SELECT * FROM createdJsonTable"), + df.collect()) + + conf.setConf("spark.sql.default.datasource", originalDefaultSource) + } } From 1682ca68738d75b01ad01a617e293366f76010ec Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 16:49:48 -0800 Subject: [PATCH 13/16] Better support for CTAS statements. --- .../org/apache/spark/sql/DataFrame.scala | 28 +++++-- .../org/apache/spark/sql/SQLContext.scala | 3 - .../main/scala/org/apache/spark/sql/api.scala | 7 +- .../apache/spark/sql/json/JSONRelation.scala | 23 +++++- .../sql/sources/DataSourceStrategy.scala | 2 +- .../apache/spark/sql/sources/commands.scala | 4 +- .../org/apache/spark/sql/sources/ddl.scala | 35 ++++++++- .../apache/spark/sql/sources/interfaces.scala | 12 ++- .../sources/CreateTableAsSelectSuite.scala | 18 +++++ .../apache/spark/sql/hive/HiveContext.scala | 4 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 24 +++--- .../spark/sql/hive/execution/commands.scala | 62 ++++++++++++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 76 ++++++++++++++++--- 13 files changed, 235 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 2f641e9f026f..1730c99f719b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.sources.{InsertableRelation, ResolvedDataSource, CreateTableUsingAsLogicalPlan} +import org.apache.spark.sql.sources.{WritableRelation, ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} import org.apache.spark.util.Utils @@ -620,7 +620,16 @@ class DataFrame protected[sql]( @Experimental override def saveAsTable(tableName: String): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - saveAsTable(tableName, dataSourceName, ("path", sqlContext.defaultTableFilePath(tableName))) + val cmd = + CreateTableUsingAsLogicalPlan( + tableName, + dataSourceName, + temporary = false, + Map.empty, + allowExisting = false, + logicalPlan) + + sqlContext.executePlan(cmd).toRdd } /** @@ -637,13 +646,14 @@ class DataFrame protected[sql]( override def saveAsTable( tableName: String, dataSourceName: String, + option: (String, String), options: (String, String)*): Unit = { val cmd = CreateTableUsingAsLogicalPlan( tableName, dataSourceName, temporary = false, - options.toMap, + (option +: options).toMap, allowExisting = false, logicalPlan) @@ -665,7 +675,8 @@ class DataFrame protected[sql]( tableName: String, dataSourceName: String, options: java.util.Map[String, String]): Unit = { - saveAsTable(tableName, dataSourceName, options.toSeq:_*) + val opts = options.toSeq + saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*) } @Experimental @@ -683,10 +694,12 @@ class DataFrame protected[sql]( override def save( dataSourceName: String, overwrite: Boolean, + option: (String, String), options: (String, String)*): Unit = { - val resolved = ResolvedDataSource(sqlContext, Some(schema), dataSourceName, options.toMap) + val resolved = + ResolvedDataSource(sqlContext, Some(schema), dataSourceName, (option +: options).toMap) resolved.relation match { - case i: InsertableRelation => i.insertInto(new DataFrame(sqlContext, logicalPlan), overwrite) + case i: WritableRelation => i.write(new DataFrame(sqlContext, logicalPlan), overwrite) case o => sys.error(s"Data source $dataSourceName does not support save.") } } @@ -696,7 +709,8 @@ class DataFrame protected[sql]( dataSourceName: String, overwrite: Boolean, options: java.util.Map[String, String]): Unit = { - save(dataSourceName, overwrite, options.toSeq:_*) + val opts = options.toSeq + save(dataSourceName, overwrite, opts.head, opts.tail:_*) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 90108359a11b..ac52d76b2671 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -162,9 +162,6 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName) - /** Returns the default file path for a table. */ - protected[sql] def defaultTableFilePath(tableName: String): String = ??? - /** * Creates a DataFrame from an RDD of case classes. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 3eaa1e7b1b81..6d5cd9b379f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -171,7 +171,11 @@ private[sql] trait DataFrameSpecificApi { def saveAsTable(tableName: String): Unit @Experimental - def saveAsTable(tableName: String, dataSourceName: String, options: (String, String)*): Unit + def saveAsTable( + tableName: String, + dataSourceName: String, + option: (String, String), + options: (String, String)*): Unit @Experimental def saveAsTable( @@ -189,6 +193,7 @@ private[sql] trait DataFrameSpecificApi { def save( dataSourceName: String, overwrite: Boolean, + option: (String, String), options: (String, String)*): Unit @Experimental diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 6de9abc1af34..782e4c532afb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider { +private[sql] class DefaultSource + extends RelationProvider with SchemaRelationProvider with CreateableRelation { /** Returns a new base relation with the parameters. */ override def createRelation( @@ -48,6 +49,22 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro JSONRelation(path, samplingRatio, Some(schema))(sqlContext) } + + override def createRelation( + sqlContext: SQLContext, + name: String, + options: Map[String, String], + data: DataFrame): Map[String, String] = { + val path = options.getOrElse("path", sys.error("Option 'path' not specified")) + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) { + sys.error(s"path $path already exists.") + } + data.toJSON.saveAsTextFile(path) + + options + } } private[sql] case class JSONRelation( @@ -55,7 +72,7 @@ private[sql] case class JSONRelation( samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) - extends TableScan with InsertableRelation { + extends TableScan with WritableRelation { // TODO: Support partitioned JSON relation. private def baseRDD = sqlContext.sparkContext.textFile(path) @@ -70,7 +87,7 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) - override def insertInto(data: DataFrame, overwrite: Boolean) = { + override def write(data: DataFrame, overwrite: Boolean) = { val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 386ff2452f1a..e9cec423e53f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -55,7 +55,7 @@ private[sql] object DataSourceStrategy extends Strategy { execution.PhysicalRDD(l.output, t.buildScan()) :: Nil case i @ LogicalInsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => + l @ LogicalRelation(t: WritableRelation), partition, query, overwrite) => if (partition.nonEmpty) { sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 55e68069009a..0e980cdfa9f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand private[sql] case class InsertIntoRelation( - relation: InsertableRelation, + relation: WritableRelation, query: LogicalPlan, overwrite: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - relation.insertInto(new DataFrame(sqlContext, query), overwrite) + relation.write(new DataFrame(sqlContext, query), overwrite) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 1590006a23af..efb1a22e032a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -226,7 +226,7 @@ object ResolvedDataSource { dataSource .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case _ => + case dataSource: org.apache.spark.sql.sources.RelationProvider => sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.") } } @@ -236,7 +236,7 @@ object ResolvedDataSource { dataSource .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options)) - case _ => + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.") } } @@ -248,6 +248,33 @@ object ResolvedDataSource { private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) +object ResolvedCreatableDataSource { + def apply( + sqlContext: SQLContext, + provider: String, + name: String, + options: Map[String, String], + data: DataFrame): Map[String, String] = { + val loader = Utils.getContextOrSparkClassLoader + val clazz: Class[_] = try loader.loadClass(provider) catch { + case cnf: java.lang.ClassNotFoundException => + try loader.loadClass(provider + ".DefaultSource") catch { + case cnf: java.lang.ClassNotFoundException => + sys.error(s"Failed to load class for data source: $provider") + } + } + + clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.CreateableRelation => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.CreateableRelation] + .createRelation(sqlContext, name, options, data) + case _ => + sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") + } + } +} + private[sql] case class CreateTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], @@ -294,10 +321,10 @@ private [sql] case class CreateTempTableUsingAsSelect( def run(sqlContext: SQLContext) = { val df = new DataFrame(sqlContext, query) - val resolved = ResolvedDataSource(sqlContext, Some(df.schema), provider, options) + val augmentedOptions = ResolvedCreatableDataSource(sqlContext, provider, tableName, options, df) + val resolved = ResolvedDataSource(sqlContext, None, provider, augmentedOptions) sqlContext.registerRDDAsTable( new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) - df.insertInto(tableName, true) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index dfadaec95c4c..2da56f8be35e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -77,6 +77,14 @@ trait SchemaRelationProvider { schema: StructType): BaseRelation } +trait CreateableRelation { + def createRelation( + sqlContext: SQLContext, + name: String, + options: Map[String, String], + data: DataFrame): Map[String, String] +} + /** * ::DeveloperApi:: * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must @@ -150,6 +158,6 @@ trait CatalystScan extends BaseRelation { } @DeveloperApi -trait InsertableRelation extends BaseRelation { - def insertInto(data: DataFrame, overwrite: Boolean): Unit +trait WritableRelation extends BaseRelation { + def write(data: DataFrame, overwrite: Boolean): Unit } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index ab53f41d3ef9..b02389978b62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -79,6 +79,24 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { dropTempTable("jsonTable") + val message = intercept[RuntimeException]{ + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a * 4 FROM jt + """.stripMargin) + }.getMessage + assert( + message.contains(s"path ${path.toString} already exists."), + "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.") + + // Explicitly delete it. + if (path.exists()) Utils.deleteRecursively(path) + sql( s""" |CREATE TEMPORARY TABLE jsonTable diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4f59a2ccecb8..6503b756360f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -55,10 +55,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } - protected[sql] override def defaultTableFilePath(tableName: String): String = { - catalog.hiveDefaultTableFilePath(tableName) - } - /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 555734d0ac5a..828227b29686 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -107,7 +107,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * @param userSpecifiedSchema * @param provider * @param options - * @param allowExisting + * @param isExternal * @return */ def createDataSourceTable( @@ -115,7 +115,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String], - allowExisting: Boolean): Boolean = { + isExternal: Boolean): Unit = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) @@ -125,22 +125,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } - tbl.setProperty("EXTERNAL", "TRUE") - tbl.setTableType(TableType.EXTERNAL_TABLE) + if (isExternal) { + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + } else { + tbl.setProperty("EXTERNAL", "FALSE") + tbl.setTableType(TableType.MANAGED_TABLE) + } // create the table synchronized { - // Always set ifNotExists to false for createTable since we need to - // know if the table has actually been created in Hive's metastore. - try { - client.createTable(tbl, false) - } catch { - case e: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] && allowExisting => - return false - } + client.createTable(tbl, false) } - - return true } def hiveDefaultTableFilePath(tableName: String): String = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 2011513515fc..6c5f20a8a38e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.sources.ResolvedCreatableDataSource import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -106,14 +107,32 @@ case class CreateMetastoreDataSource( options: Map[String, String], allowExisting: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + hiveContext.catalog.createDataSourceTable( tableName, userSpecifiedSchema, provider, - options, - allowExisting) + optionsWithPath, + isExternal) Seq.empty[Row] } @@ -126,17 +145,36 @@ case class CreateMetastoreDataSourceAsSelect( allowExisting: Boolean, query: LogicalPlan) extends RunnableCommand { - override def run(sqlContext: SQLContext) = { + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] + + if (hiveContext.catalog.tableExists(tableName :: Nil)) { + if (allowExisting) { + return Seq.empty[Row] + } else { + sys.error(s"Table $tableName already exists.") + } + } + val df = new DataFrame(hiveContext, query) - val hasCreated = - hiveContext.catalog.createDataSourceTable( - tableName, - Some(df.schema), - provider, - options, - allowExisting) - if (hasCreated) df.insertInto(tableName, true) + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName)) + } else { + options + } + + val augmentedOptions = + ResolvedCreatableDataSource(sqlContext, provider, tableName, optionsWithPath, df) + + hiveContext.catalog.createDataSourceTable( + tableName, + None, + provider, + augmentedOptions, + isExternal) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 225f2931978e..85795acb658e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException -import org.apache.hadoop.hive.ql.metadata.HiveException import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.util import org.apache.spark.sql._ @@ -39,6 +38,7 @@ import org.apache.spark.sql.hive.test.TestHive._ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { override def afterEach(): Unit = { reset() + if (ctasPath.exists()) Utils.deleteRecursively(ctasPath) } val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile @@ -141,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { intercept[Exception] { sql("SELECT * FROM jsonTable").collect() } + + assert( + (new File(filePath)).exists(), + "The table with specified path is considered as an external table, " + + "its data should not deleted after DROP TABLE.") } test("check change without refresh") { @@ -295,7 +300,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { """.stripMargin) // Create the table again should trigger a AlreadyExistsException. - val exception = intercept[HiveException] { + val message = intercept[RuntimeException] { sql( s""" |CREATE TABLE ctasJsonTable @@ -305,9 +310,9 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |) AS |SELECT * FROM jsonTable """.stripMargin) - } - assert(exception.getCause.isInstanceOf[AlreadyExistsException], - "Hive should complain that ctasJsonTable already exists") + }.getMessage + assert(message.contains("Table ctasJsonTable already exists."), + "We should complain that ctasJsonTable already exists") // The following statement should be fine if it has IF NOT EXISTS. // It tries to create a table ctasJsonTable with a new schema. @@ -333,6 +338,57 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM jsonTable").collect()) } + test("CTAS a managed table") { + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${filePath}' + |) + """.stripMargin) + + new Path("/Users/yhuai/Desktop/whatever") + + + val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val filesystemPath = new Path(expectedPath) + val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) + + // It is a managed table when we do not specify the location. + sql( + s""" + |CREATE TABLE ctasJsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | + |) AS + |SELECT * FROM jsonTable + """.stripMargin) + + assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.") + + sql( + s""" + |CREATE TABLE loadedTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${expectedPath}' + |) + """.stripMargin) + + assert(table("ctasJsonTable").schema === table("loadedTable").schema) + + checkAnswer( + sql("SELECT * FROM ctasJsonTable"), + sql("SELECT * FROM loadedTable").collect() + ) + + sql("DROP TABLE ctasJsonTable") + assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.") + } + test("SPARK-5286 Fail to drop an invalid table when using the data source API") { sql( s""" @@ -365,11 +421,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM createdJsonTable"), df.collect()) - val exception = intercept[HiveException] { + val message = intercept[RuntimeException] { createTable("createdJsonTable", filePath.toString, false) - } - assert(exception.getCause.isInstanceOf[AlreadyExistsException], - "Hive should complain that ctasJsonTable already exists") + }.getMessage + assert(message.contains("Table createdJsonTable already exists."), + "We should complain that ctasJsonTable already exists") createTable("createdJsonTable", filePath.toString, true) // createdJsonTable should be not changed. From 34e1bfb3c5f58cc58f115ebd80036ee2f885dee7 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 20:51:05 -0800 Subject: [PATCH 14/16] Address comments. --- .../org/apache/spark/sql/DataFrame.scala | 9 ++------ .../apache/spark/sql/json/JSONRelation.scala | 15 ++++++------- .../sql/sources/DataSourceStrategy.scala | 2 +- .../apache/spark/sql/sources/commands.scala | 4 ++-- .../org/apache/spark/sql/sources/ddl.scala | 22 +++++++++---------- .../apache/spark/sql/sources/interfaces.scala | 12 +++++----- .../spark/sql/hive/execution/commands.scala | 8 +++---- 7 files changed, 32 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 1730c99f719b..12655179e10e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.sources.{WritableRelation, ResolvedDataSource, CreateTableUsingAsLogicalPlan} +import org.apache.spark.sql.sources.{InsertableRelation, ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} import org.apache.spark.util.Utils @@ -696,12 +696,7 @@ class DataFrame protected[sql]( overwrite: Boolean, option: (String, String), options: (String, String)*): Unit = { - val resolved = - ResolvedDataSource(sqlContext, Some(schema), dataSourceName, (option +: options).toMap) - resolved.relation match { - case i: WritableRelation => i.write(new DataFrame(sqlContext, logicalPlan), overwrite) - case o => sys.error(s"Data source $dataSourceName does not support save.") - } + ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this) } @Experimental diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 782e4c532afb..8372decbf8aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType private[sql] class DefaultSource - extends RelationProvider with SchemaRelationProvider with CreateableRelation { + extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider { /** Returns a new base relation with the parameters. */ override def createRelation( @@ -52,10 +52,9 @@ private[sql] class DefaultSource override def createRelation( sqlContext: SQLContext, - name: String, - options: Map[String, String], - data: DataFrame): Map[String, String] = { - val path = options.getOrElse("path", sys.error("Option 'path' not specified")) + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) { @@ -63,7 +62,7 @@ private[sql] class DefaultSource } data.toJSON.saveAsTextFile(path) - options + createRelation(sqlContext, parameters, data.schema) } } @@ -72,7 +71,7 @@ private[sql] case class JSONRelation( samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) - extends TableScan with WritableRelation { + extends TableScan with InsertableRelation { // TODO: Support partitioned JSON relation. private def baseRDD = sqlContext.sparkContext.textFile(path) @@ -87,7 +86,7 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) - override def write(data: DataFrame, overwrite: Boolean) = { + override def insert(data: DataFrame, overwrite: Boolean) = { val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index e9cec423e53f..386ff2452f1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -55,7 +55,7 @@ private[sql] object DataSourceStrategy extends Strategy { execution.PhysicalRDD(l.output, t.buildScan()) :: Nil case i @ LogicalInsertIntoTable( - l @ LogicalRelation(t: WritableRelation), partition, query, overwrite) => + l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) => if (partition.nonEmpty) { sys.error(s"Insert into a partition is not allowed because $l is not partitioned.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 0e980cdfa9f5..a1a6a020ee9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand private[sql] case class InsertIntoRelation( - relation: WritableRelation, + relation: InsertableRelation, query: LogicalPlan, overwrite: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - relation.write(new DataFrame(sqlContext, query), overwrite) + relation.insert(new DataFrame(sqlContext, query), overwrite) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index efb1a22e032a..94adfea04bcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -244,17 +244,12 @@ object ResolvedDataSource { new ResolvedDataSource(clazz, relation) } -} - -private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) -object ResolvedCreatableDataSource { def apply( sqlContext: SQLContext, provider: String, - name: String, options: Map[String, String], - data: DataFrame): Map[String, String] = { + data: DataFrame): ResolvedDataSource = { val loader = Utils.getContextOrSparkClassLoader val clazz: Class[_] = try loader.loadClass(provider) catch { case cnf: java.lang.ClassNotFoundException => @@ -264,17 +259,21 @@ object ResolvedCreatableDataSource { } } - clazz.newInstance match { - case dataSource: org.apache.spark.sql.sources.CreateableRelation => + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.CreateableRelationProvider => dataSource - .asInstanceOf[org.apache.spark.sql.sources.CreateableRelation] - .createRelation(sqlContext, name, options, data) + .asInstanceOf[org.apache.spark.sql.sources.CreateableRelationProvider] + .createRelation(sqlContext, options, data) case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") } + + new ResolvedDataSource(clazz, relation) } } +private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) + private[sql] case class CreateTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], @@ -321,8 +320,7 @@ private [sql] case class CreateTempTableUsingAsSelect( def run(sqlContext: SQLContext) = { val df = new DataFrame(sqlContext, query) - val augmentedOptions = ResolvedCreatableDataSource(sqlContext, provider, tableName, options, df) - val resolved = ResolvedDataSource(sqlContext, None, provider, augmentedOptions) + val resolved = ResolvedDataSource(sqlContext, provider, options, df) sqlContext.registerRDDAsTable( new DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 2da56f8be35e..ad0a35b91ebc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -77,12 +77,12 @@ trait SchemaRelationProvider { schema: StructType): BaseRelation } -trait CreateableRelation { +@DeveloperApi +trait CreateableRelationProvider { def createRelation( sqlContext: SQLContext, - name: String, - options: Map[String, String], - data: DataFrame): Map[String, String] + parameters: Map[String, String], + data: DataFrame): BaseRelation } /** @@ -158,6 +158,6 @@ trait CatalystScan extends BaseRelation { } @DeveloperApi -trait WritableRelation extends BaseRelation { - def write(data: DataFrame, overwrite: Boolean): Unit +trait InsertableRelation extends BaseRelation { + def insert(data: DataFrame, overwrite: Boolean): Unit } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6c5f20a8a38e..689bd0fce18a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.sources.ResolvedCreatableDataSource +import org.apache.spark.sql.sources.ResolvedDataSource import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -166,14 +166,14 @@ case class CreateMetastoreDataSourceAsSelect( options } - val augmentedOptions = - ResolvedCreatableDataSource(sqlContext, provider, tableName, optionsWithPath, df) + // Create the relation based on the data of df. + ResolvedDataSource(sqlContext, provider, optionsWithPath, df) hiveContext.catalog.createDataSourceTable( tableName, None, provider, - augmentedOptions, + optionsWithPath, isExternal) Seq.empty[Row] From 1c9888168a4716c040f9ff560207f55ec91311a0 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 21:21:21 -0800 Subject: [PATCH 15/16] Fix test. --- .../src/main/scala/org/apache/spark/sql/DataFrameImpl.scala | 4 +--- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index c39cb3838259..020b41f835ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql import java.util.{List => JList} -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan} - import scala.language.implicitConversions import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -39,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD +import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} import org.apache.spark.util.Utils diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index eb7a7750af02..4efe0c5e0cd4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest { sql("CREATE TABLE test2 (key INT, value STRING)") testData.insertInto("test2") testData.insertInto("test2") - sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test") + sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key") checkAnswer( table("test"), sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) From 3db1539b1b3766b0bb5c40ab65542106160e2dba Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Feb 2015 21:41:21 -0800 Subject: [PATCH 16/16] save does not take overwrite. --- .../org/apache/spark/sql/DataFrame.scala | 5 ---- .../org/apache/spark/sql/DataFrameImpl.scala | 10 ++------ .../apache/spark/sql/IncomputableColumn.scala | 4 --- .../main/scala/org/apache/spark/sql/api.scala | 5 ---- .../spark/sql/sources/SaveLoadSuite.scala | 25 ++++++++----------- 5 files changed, 12 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6eaaf8d40dbd..4cbfb6af5de9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -519,20 +519,15 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] { @Experimental override def save(path: String): Unit - @Experimental - override def save(path: String, overwrite: Boolean): Unit - @Experimental override def save( dataSourceName: String, - overwrite: Boolean, option: (String, String), options: (String, String)*): Unit @Experimental override def save( dataSourceName: String, - overwrite: Boolean, options: java.util.Map[String, String]): Unit /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 020b41f835ca..f84dbf32fa5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -343,17 +343,12 @@ private[sql] class DataFrameImpl protected[sql]( } override def save(path: String): Unit = { - save(path, false) - } - - override def save(path: String, overwrite: Boolean): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - save(dataSourceName, overwrite, ("path" -> path)) + save(dataSourceName, ("path" -> path)) } override def save( dataSourceName: String, - overwrite: Boolean, option: (String, String), options: (String, String)*): Unit = { ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this) @@ -361,10 +356,9 @@ private[sql] class DataFrameImpl protected[sql]( override def save( dataSourceName: String, - overwrite: Boolean, options: java.util.Map[String, String]): Unit = { val opts = options.toSeq - save(dataSourceName, overwrite, opts.head, opts.tail:_*) + save(dataSourceName, opts.head, opts.tail:_*) } override def insertInto(tableName: String, overwrite: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 3d9c31c85671..9b051de68feb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -165,17 +165,13 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def save(path: String): Unit = err() - override def save(path: String, overwrite: Boolean): Unit = err() - override def save( dataSourceName: String, - overwrite: Boolean, option: (String, String), options: (String, String)*): Unit = err() override def save( dataSourceName: String, - overwrite: Boolean, options: java.util.Map[String, String]): Unit = err() override def insertInto(tableName: String, overwrite: Boolean): Unit = err() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala index 6d5cd9b379f2..c4a00cdb2040 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala @@ -186,20 +186,15 @@ private[sql] trait DataFrameSpecificApi { @Experimental def save(path: String): Unit - @Experimental - def save(path: String, overwrite: Boolean): Unit - @Experimental def save( dataSourceName: String, - overwrite: Boolean, option: (String, String), options: (String, String)*): Unit @Experimental def save( dataSourceName: String, - overwrite: Boolean, options: java.util.Map[String, String]): Unit @Experimental diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index ea97f387b292..fe2f76cc397f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -60,34 +60,29 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll { } test("save with overwrite and load") { - df.save(path.toString, true) - + df.save(path.toString) checkLoad } test("save with data source and options, and load") { - df.save("org.apache.spark.sql.json", true, ("path", path.toString)) - + df.save("org.apache.spark.sql.json", ("path", path.toString)) checkLoad } - test("save and save again without overwrite") { - df.save(path.toString, true) + test("save and save again") { + df.save(path.toString) - val exception = intercept[RuntimeException] { + val message = intercept[RuntimeException] { df.save(path.toString) - } + }.getMessage assert( - exception.getMessage.contains("JSON table only support INSERT OVERWRITE for now."), - "JSON table should only support INSERT OVERWRITE for now.") - } + message.contains("already exists"), + "We should complain that the path already exists.") - test("save and save again with overwrite") { - df.save(path.toString, true) - df.save(path.toString, true) + if (path.exists()) Utils.deleteRecursively(path) + df.save(path.toString) checkLoad } - } \ No newline at end of file