Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1619,11 +1619,18 @@ class Analyzer(
case _ => expr
}

/** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */
private object AliasedGenerator {
def unapply(e: Expression): Option[(Generator, Seq[String])] = e match {
case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil))
case MultiAlias(g: Generator, names) if g.resolved => Some(g, names)
/**
* Extracts a [[Generator]] expression, any names assigned by aliases to the outputs
* and the outer flag. The outer flag is used when joining the generator output.
* @param e the [[Expression]]
* @return (the [[Generator]], seq of output names, outer flag)
*/
def unapply(e: Expression): Option[(Generator, Seq[String], Boolean)] = e match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what the return value means (especially that boolean value, but also the Seq[String] that's preexisting)

case Alias(GeneratorOuter(g: Generator), name) if g.resolved => Some((g, name :: Nil, true))
case MultiAlias(GeneratorOuter(g: Generator), names) if g.resolved => Some(g, names, true)
case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil, false))
case MultiAlias(g: Generator, names) if g.resolved => Some(g, names, false)
case _ => None
}
}
Expand All @@ -1644,7 +1651,8 @@ class Analyzer(
var resolvedGenerator: Generate = null

val newProjectList = projectList.flatMap {
case AliasedGenerator(generator, names) if generator.childrenResolved =>

case AliasedGenerator(generator, names, outer) if generator.childrenResolved =>
// It's a sanity check, this should not happen as the previous case will throw
// exception earlier.
assert(resolvedGenerator == null, "More than one generator found in SELECT.")
Expand All @@ -1653,7 +1661,7 @@ class Analyzer(
Generate(
generator,
join = projectList.size > 1, // Only join if there are other expressions in SELECT.
outer = false,
outer = outer,
qualifier = None,
generatorOutput = ResolveGenerate.makeGeneratorOutput(generator, names),
child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ object FunctionRegistry {
expression[Abs]("abs"),
expression[Coalesce]("coalesce"),
expression[Explode]("explode"),
expressionGeneratorOuter[Explode]("explode_outer"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need an update on ExpressionDescription for these three new expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@gatorsmile gatorsmile Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-sql> desc function extended explode_outer;

Function: explode_outer
Class: org.apache.spark.sql.catalyst.expressions.Explode
Usage: explode_outer(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns.
Extended Usage:
    Examples:
      > SELECT explode_outer(array(10, 20));
       10
       20

@hvanhovell explode_outer is sharing the same expression description. Do we need an update on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need an update? What is the extra information you want to convey? Do you want to add a generic line saying that an outer generator might produce nulls instead of filtering out the row?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Update the description and provide an example to users? Maybe hardcode the function name instead of using _FUNC_?

@ExpressionDescription(
  usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows, or the elements of map `expr` into multiple rows and columns.",
  extended = """
    Examples:
      > SELECT _FUNC_(array(10, 20));
       10
       20
  """)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super enthusiastic about this. We have three options here:

  1. Leave as it is.
  2. Remove the outer_... generators, and make a user use the lateral view outer ... instead.
  3. Create separate OuterGenerator classes for each one, and provide proper documentation.

I am fine with any.

expression[Greatest]("greatest"),
expression[If]("if"),
expression[Inline]("inline"),
expressionGeneratorOuter[Inline]("inline_outer"),
expression[IsNaN]("isnan"),
expression[IfNull]("ifnull"),
expression[IsNull]("isnull"),
Expand All @@ -176,6 +178,7 @@ object FunctionRegistry {
expression[Nvl]("nvl"),
expression[Nvl2]("nvl2"),
expression[PosExplode]("posexplode"),
expressionGeneratorOuter[PosExplode]("posexplode_outer"),
expression[Rand]("rand"),
expression[Randn]("randn"),
expression[Stack]("stack"),
Expand Down Expand Up @@ -508,4 +511,13 @@ object FunctionRegistry {
new ExpressionInfo(clazz.getCanonicalName, name)
}
}

private def expressionGeneratorOuter[T <: Generator : ClassTag](name: String)
: (String, (ExpressionInfo, FunctionBuilder)) = {
val (_, (info, generatorBuilder)) = expression[T](name)
val outerBuilder = (args: Seq[Expression]) => {
GeneratorOuter(generatorBuilder(args).asInstanceOf[Generator])
}
(name, (info, outerBuilder))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ case class Stack(children: Seq[Expression]) extends Generator {
}
}

case class GeneratorOuter(child: Generator) extends UnaryExpression with Generator {
final override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

override def elementSchema: StructType = child.elementSchema
}
/**
* A base class for [[Explode]] and [[PosExplode]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ case class Generate(

override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

def qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q =>
// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifier(Some(q)))
}.getOrElse(generatorOutput)
def qualifiedGeneratorOutput: Seq[Attribute] = {
val qualifiedOutput = qualifier.map { q =>
// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifier(Some(q)))
}.getOrElse(generatorOutput)
val nullableOutput = qualifiedOutput.map {
// if outer, make all attributes nullable, otherwise keep existing nullability
a => a.withNullability(outer || a.nullable)
}
nullableOutput
}

def output: Seq[Attribute] = {
if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput
Expand Down
5 changes: 1 addition & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ class Column(val expr: Expression) extends Logging {

// Leave an unaliased generator with an empty list of names since the analyzer will generate
// the correct defaults after the nested expression's type has been resolved.
case explode: Explode => MultiAlias(explode, Nil)
case explode: PosExplode => MultiAlias(explode, Nil)

case jt: JsonTuple => MultiAlias(jt, Nil)
case g: Generator => MultiAlias(g, Nil)

case func: UnresolvedFunction => UnresolvedAlias(func, Some(Column.generateAlias))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,15 @@ case class GenerateExec(
val index = ctx.freshName("index")

// Add a check if the generate outer flag is true.
val checks = optionalCode(outer, data.isNull)
val checks = optionalCode(outer, s"($index == -1)")

// Add position
val position = if (e.position) {
Seq(ExprCode("", "false", index))
if (outer) {
Seq(ExprCode("", s"$index == -1", index))
} else {
Seq(ExprCode("", "false", index))
}
} else {
Seq.empty
}
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,15 @@ object functions {
*/
def explode(e: Column): Column = withExpr { Explode(e.expr) }

/**
* Creates a new row for each element in the given array or map column.
* Unlike explode, if the array/map is null or empty then null is produced.
*
* @group collection_funcs
* @since 2.2.0
*/
def explode_outer(e: Column): Column = withExpr { GeneratorOuter(Explode(e.expr)) }

/**
* Creates a new row for each element with position in the given array or map column.
*
Expand All @@ -2878,6 +2887,15 @@ object functions {
*/
def posexplode(e: Column): Column = withExpr { PosExplode(e.expr) }

/**
* Creates a new row for each element with position in the given array or map column.
* Unlike posexplode, if the array/map is null or empty then the row (null, null) is produced.
*
* @group collection_funcs
* @since 2.2.0
*/
def posexplode_outer(e: Column): Column = withExpr { GeneratorOuter(PosExplode(e.expr)) }

/**
* Extracts json object from a json string based on json path specified, and returns json string
* of the extracted json object. It will return null if the input json string is invalid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,27 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(1) :: Row(2) :: Row(3) :: Nil)
}

test("single explode_outer") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(explode_outer('intList)),
Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
}

test("single posexplode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
checkAnswer(
df.select(posexplode('intList)),
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
}

test("single posexplode_outer") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(posexplode_outer('intList)),
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil)
}

test("explode and other columns") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")

Expand All @@ -110,6 +124,26 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(1, Seq(1, 2, 3), 3) :: Nil)
}

test("explode_outer and other columns") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")

checkAnswer(
df.select($"a", explode_outer('intList)),
Row(1, 1) ::
Row(1, 2) ::
Row(1, 3) ::
Row(2, null) ::
Nil)

checkAnswer(
df.select($"*", explode_outer('intList)),
Row(1, Seq(1, 2, 3), 1) ::
Row(1, Seq(1, 2, 3), 2) ::
Row(1, Seq(1, 2, 3), 3) ::
Row(2, Seq(), null) ::
Nil)
}

test("aliased explode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")

Expand All @@ -122,6 +156,18 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(6) :: Nil)
}

test("aliased explode_outer") {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")

checkAnswer(
df.select(explode_outer('intList).as('int)).select('int),
Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)

checkAnswer(
df.select(explode('intList).as('int)).select(sum('int)),
Row(6) :: Nil)
}

test("explode on map") {
val df = Seq((1, Map("a" -> "b"))).toDF("a", "map")

Expand All @@ -130,6 +176,15 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row("a", "b"))
}

test("explode_outer on map") {
val df = Seq((1, Map("a" -> "b")), (2, Map[String, String]()),
(3, Map("c" -> "d"))).toDF("a", "map")

checkAnswer(
df.select(explode_outer('map)),
Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
}

test("explode on map with aliases") {
val df = Seq((1, Map("a" -> "b"))).toDF("a", "map")

Expand All @@ -138,6 +193,14 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row("a", "b"))
}

test("explode_outer on map with aliases") {
val df = Seq((3, None), (1, Some(Map("a" -> "b")))).toDF("a", "map")

checkAnswer(
df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"),
Row("a", "b") :: Row(null, null) :: Nil)
}

test("self join explode") {
val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
val exploded = df.select(explode('intList).as('i))
Expand Down Expand Up @@ -207,6 +270,19 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
Row(1) :: Row(2) :: Nil)
}

test("inline_outer") {
val df = Seq((1, "2"), (3, "4"), (5, "6")).toDF("col1", "col2")
val df2 = df.select(when('col1 === 1, null).otherwise(array(struct('col1, 'col2))).as("col1"))
checkAnswer(
df2.selectExpr("inline(col1)"),
Row(3, "4") :: Row(5, "6") :: Nil
)
checkAnswer(
df2.selectExpr("inline_outer(col1)"),
Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
)
}

test("SPARK-14986: Outer lateral view with empty generate expression") {
checkAnswer(
sql("select nil from values 1 lateral view outer explode(array()) n as nil"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkSqlGeneration("SELECT array(1,2,3)")
checkSqlGeneration("SELECT coalesce(null, 1, 2)")
checkSqlGeneration("SELECT explode(array(1,2,3))")
checkSqlGeneration("SELECT explode_outer(array())")
checkSqlGeneration("SELECT greatest(1,null,3)")
checkSqlGeneration("SELECT if(1==2, 'yes', 'no')")
checkSqlGeneration("SELECT isnan(15), isnan('invalid')")
Expand All @@ -102,6 +103,8 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkSqlGeneration("SELECT map(1, 'a', 2, 'b')")
checkSqlGeneration("SELECT named_struct('c1',1,'c2',2,'c3',3)")
checkSqlGeneration("SELECT nanvl(a, 5), nanvl(b, 10), nanvl(d, c) from t2")
checkSqlGeneration("SELECT posexplode_outer(array())")
checkSqlGeneration("SELECT inline_outer(array(struct('a', 1)))")
checkSqlGeneration("SELECT rand(1)")
checkSqlGeneration("SELECT randn(3)")
checkSqlGeneration("SELECT struct(1,2,3)")
Expand Down