diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7ccbb2dac90c8..5b60cd4f50b8a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -339,6 +339,7 @@ querySpecification fromClause? (WHERE where=booleanExpression)?) | ((kind=SELECT setQuantifier? namedExpressionSeq fromClause? + |kind=SELECT setQuantifier? namedExpressionSeq intoClause? fromClause? | fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?) lateralView* (WHERE where=booleanExpression)? @@ -347,6 +348,10 @@ querySpecification windows?) ; +intoClause + : INTO tableIdentifier + ; + fromClause : FROM relation (',' relation)* lateralView* ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f2cc8d362478a..9f92e0e49b3b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -132,6 +132,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { // Build the insert clauses. val inserts = ctx.multiInsertQueryBody.asScala.map { body => + assert(body.querySpecification.intoClause == null, + "Multi-Insert queries cannot have a into clause in their individual SELECT statements", + body) + assert(body.querySpecification.fromClause == null, "Multi-Insert queries cannot have a FROM clause in their individual SELECT statements", body) @@ -155,11 +159,26 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitSingleInsertQuery( ctx: SingleInsertQueryContext): LogicalPlan = withOrigin(ctx) { + // Find intoClause + val intoClause = ctx.queryTerm match { + case queryTermDefault : QueryTermDefaultContext => + queryTermDefault.queryPrimary match { + case queryPrimaryDefault : QueryPrimaryDefaultContext => + queryPrimaryDefault.querySpecification.intoClause + case _ => null + } + case _ => null + } + if (ctx.insertInto != null && intoClause != null) { + operationNotAllowed("INSERT INTO ... SELECT INTO", ctx) + } plan(ctx.queryTerm). // Add organization statements. optionalMap(ctx.queryOrganization)(withQueryResultClauses). // Add insert. - optionalMap(ctx.insertInto())(withInsertInto) + optionalMap(ctx.insertInto())(withInsertInto). + // exist intoClause + optionalMap(intoClause)(withSelectInto) } /** @@ -394,6 +413,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { throw new ParseException("Script Transform is not supported", ctx) } + /** + * Change to Hive CTAS statement. + */ + protected def withSelectInto( + ctx: IntoClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + throw new ParseException("Script Select Into is not supported", ctx) + } + /** * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma * separated) relations here, these get converted into a single plan by condition-less inner join. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c5f4d58da43ac..a2f04c21f431d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1376,4 +1376,62 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { reader, writer, schemaLess) } + + /** + * Reuse CTAS, convert select into to CTAS, + * returning [[CreateHiveTableAsSelectLogicalPlan]]. + * The SELECT INTO statement selects data from one table + * and inserts it into a new table.It is commonly used to + * create a backup copy for table or selected records. + * + * Expected format: + * {{{ + * SELECT column_name(s) + * INTO new_table + * FROM old_table + * ... + * }}} + */ + override protected def withSelectInto( + ctx: IntoClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + // Storage format + val defaultStorage: CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + // Note: Keep this unspecified because we use the presence of the serde to decide + // whether to convert a table created by CTAS to a datasource table. + serde = None, + compressed = false, + serdeProperties = Map()) + } + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = visitTableIdentifier(ctx.tableIdentifier), + tableType = CatalogTableType.MANAGED, + storage = defaultStorage, + schema = Nil + ) + + // Table shouldn't exist + if (conf.convertCTAS) { + CreateTableUsingAsSelect( + tableIdent = tableDesc.identifier, + provider = conf.defaultDataSourceName, + partitionColumns = Array(), + bucketSpec = None, + mode = SaveMode.ErrorIfExists, + options = Map.empty[String, String], + query + ) + } else { + CreateHiveTableAsSelectLogicalPlan(tableDesc, query, false) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 961d95c268b2c..53f60a0d62548 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1755,4 +1755,77 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("select into(check relation)") { + withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { + withTable("si1", "si2") { + val defaultDataSource = sessionState.conf.defaultDataSourceName + sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") + val message = intercept[AnalysisException] { + sql("SELECT key, value INTO si1 FROM src ORDER BY key, value") + }.getMessage + assert(message.contains("already exists")) + checkRelation("si1", true, defaultDataSource) + + // Specifying database name for query can be converted to data source write path + // is not allowed right now. + sql("SELECT key, value INTO default.si2 FROM src ORDER BY key, value") + checkRelation("si2", true, defaultDataSource) + } + } + } + + test("select into(check answer)") { + withTable("si1", "si2", "si3", "si4") { + sql("SELECT key, value INTO si1 FROM src") + checkAnswer( + sql("SELECT key, value FROM si1 ORDER BY key"), + sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) + + sql("SELECT key k, value INTO si2 FROM src ORDER BY k,value").collect() + checkAnswer( + sql("SELECT k, value FROM si2 ORDER BY k, value"), + sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) + + sql("SELECT 1 AS key,value INTO si3 FROM src LIMIT 1").collect() + checkAnswer( + sql("SELECT key, value FROM si3 ORDER BY key, value"), + sql("SELECT key, value FROM si3 LIMIT 1").collect().toSeq) + + sql("SELECT 1 INTO si4").collect() + checkAnswer( + sql("SELECT 1 FROM si4"), + sql("SELECT 1").collect().toSeq) + } + } + + test("select into(specifying the column list)") { + withTable("mytable1", "si1") { + Seq((1, "111111"), (2, "222222")).toDF("key", "value").createOrReplaceTempView("mytable1") + + sql("SELECT key as a,value as b INTO si1 FROM mytable1") + checkAnswer( + sql("SELECT a, b from si1"), + sql("select key, value from mytable1").collect()) + } + } + + test("select into(double nested data)") { + withTable("nested", "si1") { + sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil) + .toDF().createOrReplaceTempView("nested") + checkAnswer( + sql("SELECT f1.f2.f3 FROM nested"), + Row(1)) + + sql("SELECT * INTO si1 FROM nested") + checkAnswer( + sql("SELECT * FROM si1"), + sql("SELECT * FROM nested").collect().toSeq) + + intercept[AnalysisException] { + sql("SELECT * INTO si1 FROM notexists").collect() + } + } + } }