diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 01afa01ae95c5..5d4e5a528faf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -253,6 +253,10 @@ case class MultiAlias(child: Expression, names: Seq[String]) override def toString: String = s"$child AS $names" + override def sql: String = { + val aliasNames = names.map(quoteIdentifier(_)).mkString(",") + s"${child.sql} AS ($aliasNames)" + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala index d2318417e3e68..e3f6ff94fab10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala @@ -999,10 +999,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val attributes = clauses.collect { - case Token(a, Nil) => UnresolvedAttribute(a.toLowerCase) + case Token(a, Nil) => UnresolvedAttribute(cleanIdentifier(a.toLowerCase)) } - Generate(generator, join = true, outer = outer, Some(alias.toLowerCase), attributes, child) + Generate( + generator, + join = true, + outer = outer, + Some(cleanIdentifier(alias.toLowerCase)), + attributes, + child) } protected def nodeToGenerator(node: ASTNode): Generator = noParseRule("Generator", node) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 683f738054c5a..3bae7b41eaba1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.MultiAlias import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.logical._ @@ -42,7 +43,7 @@ case class SubqueryHolder(query: String) extends LeafExpression with Unevaluable } /** - * A builder class used to convert a resolved logical plan into a SQL query string. Note that this + * A builder class used to convert a resolved logical plan into a SQL query string. Note that not * all resolved logical plan are convertible. They either don't have corresponding SQL * representations (e.g. logical plans that operate on local Scala collections), or are simply not * supported by this builder (yet). @@ -94,6 +95,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Distinct(p: Project) => projectToSQL(p, isDistinct = true) + case g : Generate => + generateToSQL(g) + case p: Project => projectToSQL(p, isDistinct = false) @@ -205,15 +209,36 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi segments.map(_.trim).filter(_.nonEmpty).mkString(" ") private def projectToSQL(plan: Project, isDistinct: Boolean): String = { + val (projectListExprs, planToProcess) = getProjectListExprs(plan) build( "SELECT", if (isDistinct) "DISTINCT" else "", - plan.projectList.map(_.sql).mkString(", "), - if (plan.child == OneRowRelation) "" else "FROM", - toSQL(plan.child) + projectListExprs.map(_.sql).mkString(", "), + if (planToProcess == OneRowRelation) "" else "FROM", + toSQL(planToProcess) ) } + private def getProjectListExprs(plan: Project): (Seq[NamedExpression], LogicalPlan) = { + plan match { + case p @ Project(_, g: Generate) if g.qualifier.isEmpty => + // Only keep the first generated column in the list so that we can + // transform it to a Generator expression in the following step. + val projList = p.projectList.filter { + case e: Expression if g.generatorOutput.tail.exists(_.semanticEquals(e)) => false + case _ => true + } + val exprs = projList.map { + case e: Expression if g.generatorOutput.exists(_.semanticEquals(e)) => + val names = g.generatorOutput.map(_.name) + MultiAlias(g.generator, names) + case other => other + } + (exprs, g.child) + case _ => (plan.projectList, plan.child) + } + } + private def aggregateToSQL(plan: Aggregate): String = { val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ") build( @@ -297,6 +322,31 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + /* This function handles the SQL generation when generators are specified in the + * LATERAL VIEW clause. SQL generation of generators specified in projection lists + * is handled in projectToSQL. + * sample plan : + * +- Project [mycol2#192] + * +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192] + * +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191] + * +- MetastoreRelation default, src, None + * + */ + private def generateToSQL(plan: Generate): String = { + val columnAliases = plan.generatorOutput.map(a => quoteIdentifier(a.name)).mkString(",") + val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get + val outerClause = if (plan.outer) "OUTER" else "" + build( + toSQL(plan.child), + "LATERAL VIEW", + outerClause, + plan.generator.sql, + quoteIdentifier(generatorAlias), + "AS", + columnAliases + ) + } + object Canonicalizer extends RuleExecutor[LogicalPlan] { override protected def batches: Seq[Batch] = Seq( Batch("Canonicalizer", FixedPoint(100), @@ -329,6 +379,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case plan @ Project(_, _: SubqueryAlias + | _: Generate | _: Filter | _: Join | _: MetastoreRelation diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index efd33f59416a8..50d574bc31827 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -208,4 +208,13 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { |USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t """.stripMargin) } + + test("use backticks in output of Generator") { + val plan = parser.parsePlan( + """SELECT `gentab2`.`gencol2` + |FROM `default`.`src` + |LATERAL VIEW explode(array(array(1, 2, 3))) `gentab1` AS `gencol1` + |LATERAL VIEW explode(`gentab1`.`gencol1`) `gentab2` AS `gencol2` + """.stripMargin) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index ed85856f017df..975ad2fb51355 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -29,7 +29,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t1") + + val tuples: Seq[(String, String)] = + ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: + ("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: + ("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") :: + ("4", null) :: + ("5", """{"f1": "", "f5": null}""") :: + ("6", "[invalid JSON string]") :: + Nil sqlContext.range(10).write.saveAsTable("parquet_t0") sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0") @@ -45,13 +56,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write .saveAsTable("parquet_t2") + + tuples.toDF("key", "jstring").write.saveAsTable("parquet_t3") + sql("CREATE TABLE t1 as select key, array(value) as value from parquet_t1 limit 20") } override protected def afterAll(): Unit = { sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t1") } private def checkHiveQl(hiveQl: String): Unit = { @@ -445,4 +461,91 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { "f1", "b[0].f1", "f1", "c[foo]", "d[0]" ) } + + test("SQL generator for explode in projection list") { + // Basic Explode + checkHiveQl("SELECT explode(array(1,2,3)) FROM src") + + // Explode with Alias + checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src") + + // Explode without FROM + checkHiveQl("select explode(array(1,2,3)) AS gencol") + + // non-generated columns in projection list + checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1") + } + + test("SQL generation for json_tuple as generator") { + checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3") + } + + test("SQL generation for lateral views") { + // Filter and OUTER clause + checkHiveQl( + """SELECT key, value + |FROM t1 LATERAL VIEW OUTER explode(value) gentab as gencol + |WHERE key = 1 + """.stripMargin + ) + + // single lateral view + checkHiveQl( + """SELECT * + |FROM t1 LATERAL VIEW explode(array(1,2,3)) gentab AS gencol + |SORT BY key ASC, gencol ASC LIMIT 1 + """.stripMargin + ) + + // multiple lateral views + checkHiveQl( + """SELECT gentab2.* + |FROM t1 + |LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1 + |LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3 + """.stripMargin + ) + + // No generated column aliases + checkHiveQl( + """SELECT gentab.* + |FROM + |t1 LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2 + """.stripMargin + ) + } + + test("SQL generation for lateral views in subquery") { + // Subquries in FROM clause using Generate + checkHiveQl( + """SELECT subq.gencol + |FROM + |(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin) + + checkHiveQl( + """SELECT subq.key + |FROM + |(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq + """.stripMargin + ) + } + + test("SQL generation for UDTF") { + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) + + checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol") + + checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1") + + sql("DROP TEMPORARY FUNCTION udtf_count2") + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 2f8c2beb17f4b..dd0997fa07eab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -264,6 +264,14 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer( sql("SELECT ints FROM nestedArray LATERAL VIEW explode(a.b) a AS ints"), Row(1) :: Row(2) :: Row(3) :: Nil) + + checkAnswer( + sql("SELECT `ints` FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints`"), + Row(1) :: Row(2) :: Row(3) :: Nil) + + checkAnswer( + sql("SELECT `a`.`ints` FROM nestedArray LATERAL VIEW explode(a.b) `a` AS `ints`"), + Row(1) :: Row(2) :: Row(3) :: Nil) } test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {