From 62f46f6ab62e04cf2cc5958d67437d8626cef309 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Mon, 16 Jan 2017 18:00:25 +0100 Subject: [PATCH 1/6] Added outer_explode, outer_posexplode, outer_inline --- .../sql/catalyst/analysis/Analyzer.scala | 12 ++-- .../catalyst/analysis/FunctionRegistry.scala | 3 + .../sql/catalyst/expressions/generators.scala | 21 ++++-- .../scala/org/apache/spark/sql/Column.scala | 5 +- .../spark/sql/execution/GenerateExec.scala | 25 ++++--- .../org/apache/spark/sql/functions.scala | 18 +++++ .../spark/sql/GeneratorFunctionSuite.scala | 71 +++++++++++++++++++ .../sql/catalyst/ExpressionToSQLSuite.scala | 3 + 8 files changed, 134 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bd9037ec4333..74a453670554 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1621,9 +1621,11 @@ class Analyzer( /** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */ private object AliasedGenerator { - def unapply(e: Expression): Option[(Generator, Seq[String])] = e match { - case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil)) - case MultiAlias(g: Generator, names) if g.resolved => Some(g, names) + def unapply(e: Expression): Option[(Generator, Seq[String], Boolean)] = e match { + case Alias(GeneratorOuter(g: Generator), name) if g.resolved => Some((g, name :: Nil, true)) + case MultiAlias(GeneratorOuter(g: Generator), names) if g.resolved => Some(g, names, true) + case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil, false)) + case MultiAlias(g: Generator, names) if g.resolved => Some(g, names, false) case _ => None } } @@ -1644,7 +1646,7 @@ class Analyzer( var resolvedGenerator: Generate = null val newProjectList = projectList.flatMap { - case AliasedGenerator(generator, names) if generator.childrenResolved => + case AliasedGenerator(generator, names, outer) if generator.childrenResolved => // It's a sanity check, this should not happen as the previous case will throw // exception earlier. assert(resolvedGenerator == null, "More than one generator found in SELECT.") @@ -1653,7 +1655,7 @@ class Analyzer( Generate( generator, join = projectList.size > 1, // Only join if there are other expressions in SELECT. - outer = false, + outer = outer, qualifier = None, generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names), child) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 2b214c3c9d93..b1914e0fc13f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -175,6 +175,9 @@ object FunctionRegistry { expression[NullIf]("nullif"), expression[Nvl]("nvl"), expression[Nvl2]("nvl2"), + expression[OuterExplode]("outer_explode"), + expression[OuterInline]("outer_inline"), + expression[OuterPosExplode]("outer_posexplode"), expression[PosExplode]("posexplode"), expression[Rand]("rand"), expression[Randn]("randn"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 6c38f4998e91..b6ec8f08df9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -204,6 +204,17 @@ case class Stack(children: Seq[Expression]) extends Generator { } } +case class GeneratorOuter(child: Generator) extends UnaryExpression + with Generator { + + final override def eval(input: InternalRow = null): TraversableOnce[InternalRow] = + throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") + + final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") + + override def elementSchema: StructType = child.elementSchema +} /** * A base class for [[Explode]] and [[PosExplode]]. */ @@ -233,11 +244,11 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with if (position) { new StructType() .add("pos", IntegerType, nullable = false) - .add("key", kt, nullable = false) + .add("key", kt, nullable = true) .add("value", vt, valueContainsNull) } else { new StructType() - .add("key", kt, nullable = false) + .add("key", kt, nullable = true) .add("value", vt, valueContainsNull) } } @@ -300,7 +311,7 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with case class Explode(child: Expression) extends ExplodeBase { override val position: Boolean = false } - +class OuterExplode(child: Expression) extends GeneratorOuter(Explode(child)) /** * Given an input array produces a sequence of rows for each position and value in the array. * @@ -323,7 +334,7 @@ case class Explode(child: Expression) extends ExplodeBase { case class PosExplode(child: Expression) extends ExplodeBase { override val position = true } - +class OuterPosExplode(child: Expression) extends GeneratorOuter(PosExplode(child)) /** * Explodes an array of structs into a table. */ @@ -369,3 +380,5 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene child.genCode(ctx) } } + +class OuterInline(child: Expression) extends GeneratorOuter(Inline(child)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index a3f581ff2779..60182befd758 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -166,10 +166,7 @@ class Column(val expr: Expression) extends Logging { // Leave an unaliased generator with an empty list of names since the analyzer will generate // the correct defaults after the nested expression's type has been resolved. - case explode: Explode => MultiAlias(explode, Nil) - case explode: PosExplode => MultiAlias(explode, Nil) - - case jt: JsonTuple => MultiAlias(jt, Nil) + case g: Generator => MultiAlias(g, Nil) case func: UnresolvedFunction => UnresolvedAlias(func, Some(Column.generateAlias)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 04b16af4ea26..aa966e91a082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -160,9 +160,20 @@ case class GenerateExec( // Generate looping variables. val index = ctx.freshName("index") + val numElements = ctx.freshName("numElements") + + // In case of outer=true we need to make sure the loop is executed at-least once when the + // array/map contains no input. + // generateOuter is an int. it is set to 1 iff outer is true and the input is empty or null. + val generateOuter = ctx.freshName("generateOuter") + val isOuter = if (outer) { + "true" + } else { + "false" + } // Add a check if the generate outer flag is true. - val checks = optionalCode(outer, data.isNull) + val checks = optionalCode(outer, s"($generateOuter == 1)") // Add position val position = if (e.position) { @@ -199,21 +210,13 @@ case class GenerateExec( (initArrayData, "", values) } - // In case of outer=true we need to make sure the loop is executed at-least once when the - // array/map contains no input. We do this by setting the looping index to -1 if there is no - // input, evaluation of the array is prevented by a check in the accessor code. - val numElements = ctx.freshName("numElements") - val init = if (outer) { - s"$numElements == 0 ? -1 : 0" - } else { - "0" - } val numOutput = metricTerm(ctx, "numOutputRows") s""" |${data.code} |$initMapData |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); - |for (int $index = $init; $index < $numElements; $index++) { + |int $generateOuter = ($numElements == 0 && $isOuter) ? 1 : 0; + |for (int $index = 0; $index < $numElements + $generateOuter; $index++) { | $numOutput.add(1); | $updateRowData | ${consume(ctx, input ++ position ++ values)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 9a080fd3c97c..da5d2daab811 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2870,6 +2870,15 @@ object functions { */ def explode(e: Column): Column = withExpr { Explode(e.expr) } + /** + * Creates a new row for each element in the given array or map column. + * Unlike explode, if the array/map is null or empty then null is produced. + * + * @group collection_funcs + * @since 2.2.0 + */ + def outer_explode(e: Column): Column = withExpr { new OuterExplode(e.expr) } + /** * Creates a new row for each element with position in the given array or map column. * @@ -2878,6 +2887,15 @@ object functions { */ def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) } + /** + * Creates a new row for each element with position in the given array or map column. + * Unlike posexplode, if the array/map is null or empty then the row (0, null) is produced. + * + * @group collection_funcs + * @since 2.2.0 + */ + def outer_posexplode(e: Column): Column = withExpr { new OuterPosExplode(e.expr) } + /** * Extracts json object from a json string based on json path specified, and returns json string * of the extracted json object. It will return null if the input json string is invalid. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index f0995ea1d002..bd7b7c6edc4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -86,6 +86,12 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(explode('intList)), Row(1) :: Row(2) :: Row(3) :: Nil) } + test("single outer_explode") { + val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") + checkAnswer( + df.select(outer_explode('intList)), + Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil) + } test("single posexplode") { val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList") @@ -93,6 +99,12 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(posexplode('intList)), Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil) } + test("single outer_posexplode") { + val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") + checkAnswer( + df.select(outer_posexplode('intList)), + Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(0, 0) :: Nil) + } test("explode and other columns") { val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList") @@ -109,6 +121,25 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row(1, Seq(1, 2, 3), 2) :: Row(1, Seq(1, 2, 3), 3) :: Nil) } + test("outer_explode and other columns") { + val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") + + checkAnswer( + df.select($"a", outer_explode('intList)), + Row(1, 1) :: + Row(1, 2) :: + Row(1, 3) :: + Row(2, 0) :: + Nil) + + checkAnswer( + df.select($"*", outer_explode('intList)), + Row(1, Seq(1, 2, 3), 1) :: + Row(1, Seq(1, 2, 3), 2) :: + Row(1, Seq(1, 2, 3), 3) :: + Row(2, Seq(), 0) :: + Nil) + } test("aliased explode") { val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList") @@ -122,6 +153,18 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row(6) :: Nil) } + test("aliased outer_explode") { + val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") + + checkAnswer( + df.select(outer_explode('intList).as('int)).select('int), + Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil) + + checkAnswer( + df.select(explode('intList).as('int)).select(sum('int)), + Row(6) :: Nil) + } + test("explode on map") { val df = Seq((1, Map("a" -> "b"))).toDF("a", "map") @@ -129,6 +172,14 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(explode('map)), Row("a", "b")) } + test("outer_explode on map") { + val df = Seq((1, Map("a" -> "b")), (2, Map[String, String]()), + (3, Map("c" -> "d"))).toDF("a", "map") + + checkAnswer( + df.select(outer_explode('map)), + Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil) + } test("explode on map with aliases") { val df = Seq((1, Map("a" -> "b"))).toDF("a", "map") @@ -138,6 +189,14 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row("a", "b")) } + test("outer_explode on map with aliases") { + val df = Seq((3, None), (1, Some(Map("a" -> "b")))).toDF("a", "map") + + checkAnswer( + df.select(outer_explode('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"), + Row("a", "b") :: Row(null, null) :: Nil) + } + test("self join explode") { val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList") val exploded = df.select(explode('intList).as('i)) @@ -206,6 +265,18 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"), Row(1) :: Row(2) :: Nil) } + test("outer_inline") { + val df = Seq((1, "2"), (3, "4"), (5, "6")).toDF("col1", "col2") + val df2 = df.select(when('col1 === 1, null).otherwise(array(struct('col1, 'col2))).as("col1")) + checkAnswer( + df2.selectExpr("inline(col1)"), + Row(3, "4") :: Row(5, "6") :: Nil + ) + checkAnswer( + df2.selectExpr("outer_inline(col1)"), + Row(0, null) :: Row(3, "4") :: Row(5, "6") :: Nil + ) + } test("SPARK-14986: Outer lateral view with empty generate expression") { checkAnswer( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala index 27ea167b9050..c079c96f8d4b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala @@ -102,6 +102,9 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkSqlGeneration("SELECT map(1, 'a', 2, 'b')") checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)") checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2") + checkSqlGeneration("SELECT outer_explode(array())") + checkSqlGeneration("SELECT outer_posexplode(array())") + checkSqlGeneration("SELECT outer_inline(array(struct('a', 1)))") checkSqlGeneration("SELECT rand(1)") checkSqlGeneration("SELECT randn(3)") checkSqlGeneration("SELECT struct(1,2,3)") From 26a1a67751307b6053e4e85262b6b7ca029ff62b Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 17 Jan 2017 00:51:10 +0100 Subject: [PATCH 2/6] fixed some review issues --- .../sql/catalyst/analysis/Analyzer.scala | 8 +++- .../catalyst/analysis/FunctionRegistry.scala | 14 ++++-- .../sql/catalyst/expressions/generators.scala | 10 ++-- .../plans/logical/basicLogicalOperators.scala | 5 +- .../spark/sql/execution/GenerateExec.scala | 31 ++++++------ .../org/apache/spark/sql/functions.scala | 4 +- .../spark/sql/GeneratorFunctionSuite.scala | 47 ++++++++++--------- 7 files changed, 70 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 74a453670554..98851cb8557a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1619,8 +1619,13 @@ class Analyzer( case _ => expr } - /** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */ private object AliasedGenerator { + /** + * Extracts a [[Generator]] expression, any names assigned by aliases to the outputs + * and the outer flag. The outer flag is used when joining the generator output. + * @param e the [[Expression]] + * @return (the [[Generator]], seq of output names, outer flag) + */ def unapply(e: Expression): Option[(Generator, Seq[String], Boolean)] = e match { case Alias(GeneratorOuter(g: Generator), name) if g.resolved => Some((g, name :: Nil, true)) case MultiAlias(GeneratorOuter(g: Generator), names) if g.resolved => Some(g, names, true) @@ -1646,6 +1651,7 @@ class Analyzer( var resolvedGenerator: Generate = null val newProjectList = projectList.flatMap { + case AliasedGenerator(generator, names, outer) if generator.childrenResolved => // It's a sanity check, this should not happen as the previous case will throw // exception earlier. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index b1914e0fc13f..4d25707d64d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -163,9 +163,11 @@ object FunctionRegistry { expression[Abs]("abs"), expression[Coalesce]("coalesce"), expression[Explode]("explode"), + expressionGeneratorOuter[Explode]("explode_outer"), expression[Greatest]("greatest"), expression[If]("if"), expression[Inline]("inline"), + expressionGeneratorOuter[Inline]("inline_outer"), expression[IsNaN]("isnan"), expression[IfNull]("ifnull"), expression[IsNull]("isnull"), @@ -175,10 +177,8 @@ object FunctionRegistry { expression[NullIf]("nullif"), expression[Nvl]("nvl"), expression[Nvl2]("nvl2"), - expression[OuterExplode]("outer_explode"), - expression[OuterInline]("outer_inline"), - expression[OuterPosExplode]("outer_posexplode"), expression[PosExplode]("posexplode"), + expressionGeneratorOuter[PosExplode]("posexplode_outer"), expression[Rand]("rand"), expression[Randn]("randn"), expression[Stack]("stack"), @@ -511,4 +511,12 @@ object FunctionRegistry { new ExpressionInfo(clazz.getCanonicalName, name) } } + private def expressionGeneratorOuter[T <: Generator : ClassTag] + (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { + val regularGen = expression[T](name) + val outerBuilder = (args: Seq[Expression]) => { + GeneratorOuter(regularGen._2._2(args).asInstanceOf[Generator]) + } + (name, (expressionInfo[GeneratorOuter](name), outerBuilder)) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index b6ec8f08df9d..65885fffa2b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -244,11 +244,11 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with if (position) { new StructType() .add("pos", IntegerType, nullable = false) - .add("key", kt, nullable = true) + .add("key", kt, nullable = false) .add("value", vt, valueContainsNull) } else { new StructType() - .add("key", kt, nullable = true) + .add("key", kt, nullable = false) .add("value", vt, valueContainsNull) } } @@ -311,7 +311,7 @@ abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with case class Explode(child: Expression) extends ExplodeBase { override val position: Boolean = false } -class OuterExplode(child: Expression) extends GeneratorOuter(Explode(child)) + /** * Given an input array produces a sequence of rows for each position and value in the array. * @@ -334,7 +334,7 @@ class OuterExplode(child: Expression) extends GeneratorOuter(Explode(child)) case class PosExplode(child: Expression) extends ExplodeBase { override val position = true } -class OuterPosExplode(child: Expression) extends GeneratorOuter(PosExplode(child)) + /** * Explodes an array of structs into a table. */ @@ -380,5 +380,3 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene child.genCode(ctx) } } - -class OuterInline(child: Expression) extends GeneratorOuter(Inline(child)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 48f68a6415bd..fac21dadfe11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -104,7 +104,10 @@ case class Generate( def qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q => // prepend the new qualifier to the existed one generatorOutput.map(a => a.withQualifier(Some(q))) - }.getOrElse(generatorOutput) + }.getOrElse(generatorOutput).map { + // if outer, make all attributes nullable, otherwise keep existing nullability + a => a.withNullability(outer || a.nullable) + } def output: Seq[Attribute] = { if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index aa966e91a082..b52f5c4d4a90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -160,24 +160,17 @@ case class GenerateExec( // Generate looping variables. val index = ctx.freshName("index") - val numElements = ctx.freshName("numElements") - - // In case of outer=true we need to make sure the loop is executed at-least once when the - // array/map contains no input. - // generateOuter is an int. it is set to 1 iff outer is true and the input is empty or null. - val generateOuter = ctx.freshName("generateOuter") - val isOuter = if (outer) { - "true" - } else { - "false" - } // Add a check if the generate outer flag is true. - val checks = optionalCode(outer, s"($generateOuter == 1)") + val checks = optionalCode(outer, s"($index == -1)") // Add position val position = if (e.position) { - Seq(ExprCode("", "false", index)) + if (outer) { + Seq(ExprCode("", s"$index == -1", index)) + } else { + Seq(ExprCode("", "false", index)) + } } else { Seq.empty } @@ -210,13 +203,21 @@ case class GenerateExec( (initArrayData, "", values) } + // In case of outer=true we need to make sure the loop is executed at-least once when the + // array/map contains no input. We do this by setting the looping index to -1 if there is no + // input, evaluation of the array is prevented by a check in the accessor code. + val numElements = ctx.freshName("numElements") + val init = if (outer) { + s"$numElements == 0 ? -1 : 0" + } else { + "0" + } val numOutput = metricTerm(ctx, "numOutputRows") s""" |${data.code} |$initMapData |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); - |int $generateOuter = ($numElements == 0 && $isOuter) ? 1 : 0; - |for (int $index = 0; $index < $numElements + $generateOuter; $index++) { + |for (int $index = $init; $index < $numElements; $index++) { | $numOutput.add(1); | $updateRowData | ${consume(ctx, input ++ position ++ values)} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index da5d2daab811..65c6f6032e0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2877,7 +2877,7 @@ object functions { * @group collection_funcs * @since 2.2.0 */ - def outer_explode(e: Column): Column = withExpr { new OuterExplode(e.expr) } + def explode_outer(e: Column): Column = withExpr { GeneratorOuter(Explode(e.expr)) } /** * Creates a new row for each element with position in the given array or map column. @@ -2894,7 +2894,7 @@ object functions { * @group collection_funcs * @since 2.2.0 */ - def outer_posexplode(e: Column): Column = withExpr { new OuterPosExplode(e.expr) } + def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) } /** * Extracts json object from a json string based on json path specified, and returns json string diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index bd7b7c6edc4a..b9871afd59e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -86,11 +86,12 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(explode('intList)), Row(1) :: Row(2) :: Row(3) :: Nil) } - test("single outer_explode") { + + test("single explode_outer") { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( - df.select(outer_explode('intList)), - Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil) + df.select(explode_outer('intList)), + Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil) } test("single posexplode") { @@ -99,11 +100,12 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(posexplode('intList)), Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil) } - test("single outer_posexplode") { + + test("single posexplode_outer") { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( - df.select(outer_posexplode('intList)), - Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(0, 0) :: Nil) + df.select(posexplode_outer('intList)), + Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil) } test("explode and other columns") { @@ -121,23 +123,24 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row(1, Seq(1, 2, 3), 2) :: Row(1, Seq(1, 2, 3), 3) :: Nil) } - test("outer_explode and other columns") { + + test("explode_outer and other columns") { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( - df.select($"a", outer_explode('intList)), + df.select($"a", explode_outer('intList)), Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: - Row(2, 0) :: + Row(2, null) :: Nil) checkAnswer( - df.select($"*", outer_explode('intList)), + df.select($"*", explode_outer('intList)), Row(1, Seq(1, 2, 3), 1) :: Row(1, Seq(1, 2, 3), 2) :: Row(1, Seq(1, 2, 3), 3) :: - Row(2, Seq(), 0) :: + Row(2, Seq(), null) :: Nil) } @@ -153,12 +156,12 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row(6) :: Nil) } - test("aliased outer_explode") { + test("aliased explode_outer") { val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList") checkAnswer( - df.select(outer_explode('intList).as('int)).select('int), - Row(1) :: Row(2) :: Row(3) :: Row(0) :: Nil) + df.select(explode_outer('intList).as('int)).select('int), + Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil) checkAnswer( df.select(explode('intList).as('int)).select(sum('int)), @@ -172,12 +175,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.select(explode('map)), Row("a", "b")) } - test("outer_explode on map") { + + test("explode_outer on map") { val df = Seq((1, Map("a" -> "b")), (2, Map[String, String]()), (3, Map("c" -> "d"))).toDF("a", "map") checkAnswer( - df.select(outer_explode('map)), + df.select(explode_outer('map)), Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil) } @@ -189,11 +193,11 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row("a", "b")) } - test("outer_explode on map with aliases") { + test("explode_outer on map with aliases") { val df = Seq((3, None), (1, Some(Map("a" -> "b")))).toDF("a", "map") checkAnswer( - df.select(outer_explode('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"), + df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"), Row("a", "b") :: Row(null, null) :: Nil) } @@ -265,7 +269,8 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { df.selectExpr("array(struct(a), named_struct('a', b))").selectExpr("inline(*)"), Row(1) :: Row(2) :: Nil) } - test("outer_inline") { + + test("inline_outer") { val df = Seq((1, "2"), (3, "4"), (5, "6")).toDF("col1", "col2") val df2 = df.select(when('col1 === 1, null).otherwise(array(struct('col1, 'col2))).as("col1")) checkAnswer( @@ -273,8 +278,8 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { Row(3, "4") :: Row(5, "6") :: Nil ) checkAnswer( - df2.selectExpr("outer_inline(col1)"), - Row(0, null) :: Row(3, "4") :: Row(5, "6") :: Nil + df2.selectExpr("inline_outer(col1)"), + Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil ) } From 36ac05414bdf79782e3e74b0e92fdff4687ffb66 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 17 Jan 2017 01:04:29 +0100 Subject: [PATCH 3/6] renamed outer_* to *_outer in ExpressionToSQLSuite --- .../apache/spark/sql/catalyst/ExpressionToSQLSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala index c079c96f8d4b..df9390aec7f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionToSQLSuite.scala @@ -93,6 +93,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkSqlGeneration("SELECT array(1,2,3)") checkSqlGeneration("SELECT coalesce(null, 1, 2)") checkSqlGeneration("SELECT explode(array(1,2,3))") + checkSqlGeneration("SELECT explode_outer(array())") checkSqlGeneration("SELECT greatest(1,null,3)") checkSqlGeneration("SELECT if(1==2, 'yes', 'no')") checkSqlGeneration("SELECT isnan(15), isnan('invalid')") @@ -102,9 +103,8 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { checkSqlGeneration("SELECT map(1, 'a', 2, 'b')") checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)") checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2") - checkSqlGeneration("SELECT outer_explode(array())") - checkSqlGeneration("SELECT outer_posexplode(array())") - checkSqlGeneration("SELECT outer_inline(array(struct('a', 1)))") + checkSqlGeneration("SELECT posexplode_outer(array())") + checkSqlGeneration("SELECT inline_outer(array(struct('a', 1)))") checkSqlGeneration("SELECT rand(1)") checkSqlGeneration("SELECT randn(3)") checkSqlGeneration("SELECT struct(1,2,3)") From 8a8af5537c050956542a6395c7db79fb28efcd10 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 17 Jan 2017 17:11:52 +0100 Subject: [PATCH 4/6] review issues --- .../sql/catalyst/analysis/FunctionRegistry.scala | 7 ++++--- .../plans/logical/basicLogicalOperators.scala | 16 ++++++++++------ .../scala/org/apache/spark/sql/functions.scala | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 4d25707d64d4..b754c8f797ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -511,12 +511,13 @@ object FunctionRegistry { new ExpressionInfo(clazz.getCanonicalName, name) } } + private def expressionGeneratorOuter[T <: Generator : ClassTag] (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { - val regularGen = expression[T](name) + val (_, (info, generatorBuilder)) = expression[T](name) val outerBuilder = (args: Seq[Expression]) => { - GeneratorOuter(regularGen._2._2(args).asInstanceOf[Generator]) + GeneratorOuter(generatorBuilder(args).asInstanceOf[Generator]) } - (name, (expressionInfo[GeneratorOuter](name), outerBuilder)) + (name, (info, outerBuilder)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index fac21dadfe11..3bd314315d27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -101,12 +101,16 @@ case class Generate( override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) - def qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q => - // prepend the new qualifier to the existed one - generatorOutput.map(a => a.withQualifier(Some(q))) - }.getOrElse(generatorOutput).map { - // if outer, make all attributes nullable, otherwise keep existing nullability - a => a.withNullability(outer || a.nullable) + def qualifiedGeneratorOutput: Seq[Attribute] = { + val qualifiedOutput = qualifier.map { q => + // prepend the new qualifier to the existed one + generatorOutput.map(a => a.withQualifier(Some(q))) + }.getOrElse(generatorOutput) + val nullableOutput = qualifiedOutput.map { + // if outer, make all attributes nullable, otherwise keep existing nullability + a => a.withNullability(outer || a.nullable) + } + nullableOutput } def output: Seq[Attribute] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 65c6f6032e0a..7c3e10e2de33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2889,7 +2889,7 @@ object functions { /** * Creates a new row for each element with position in the given array or map column. - * Unlike posexplode, if the array/map is null or empty then the row (0, null) is produced. + * Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced. * * @group collection_funcs * @since 2.2.0 From 76f15d1873d86a3b16c139b3ffd93aedb04d0dc2 Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 17 Jan 2017 21:52:24 +0100 Subject: [PATCH 5/6] indentation --- .../apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index b754c8f797ba..eea3740be8a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -512,8 +512,8 @@ object FunctionRegistry { } } - private def expressionGeneratorOuter[T <: Generator : ClassTag] - (name: String): (String, (ExpressionInfo, FunctionBuilder)) = { + private def expressionGeneratorOuter[T <: Generator : ClassTag](name: String) + : (String, (ExpressionInfo, FunctionBuilder)) = { val (_, (info, generatorBuilder)) = expression[T](name) val outerBuilder = (args: Seq[Expression]) => { GeneratorOuter(generatorBuilder(args).asInstanceOf[Generator]) From 99742be9a9a2e5f2256e5ad24f4519f5dd84323c Mon Sep 17 00:00:00 2001 From: Bogdan Raducanu Date: Tue, 17 Jan 2017 21:53:58 +0100 Subject: [PATCH 6/6] indentation --- .../apache/spark/sql/catalyst/expressions/generators.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 65885fffa2b1..1b98c30d3760 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -204,9 +204,7 @@ case class Stack(children: Seq[Expression]) extends Generator { } } -case class GeneratorOuter(child: Generator) extends UnaryExpression - with Generator { - +case class GeneratorOuter(child: Generator) extends UnaryExpression with Generator { final override def eval(input: InternalRow = null): TraversableOnce[InternalRow] = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")