Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,26 @@ def test_udf_in_generate(self):
row = df.select(explode(f(*df))).groupBy().sum().first()
self.assertEqual(row[0], 10)

df = self.spark.range(3)
res = df.select("id", explode(f(df.id))).collect()
self.assertEqual(res[0][0], 1)
self.assertEqual(res[0][1], 0)
self.assertEqual(res[1][0], 2)
self.assertEqual(res[1][1], 0)
self.assertEqual(res[2][0], 2)
self.assertEqual(res[2][1], 1)

range_udf = udf(lambda value: list(range(value - 1, value + 1)), ArrayType(IntegerType()))
res = df.select("id", explode(range_udf(df.id))).collect()
self.assertEqual(res[0][0], 0)
self.assertEqual(res[0][1], -1)
self.assertEqual(res[1][0], 0)
self.assertEqual(res[1][1], 0)
self.assertEqual(res[2][0], 1)
self.assertEqual(res[2][1], 0)
self.assertEqual(res[3][0], 1)
self.assertEqual(res[3][1], 1)

def test_udf_with_order_by_and_limit(self):
from pyspark.sql.functions import udf
my_copy = udf(lambda x: x, IntegerType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ case class Generate(

override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

def output: Seq[Attribute] = {
val qualified = qualifier.map(q =>
// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifier(Some(q)))
).getOrElse(generatorOutput)
val qualifiedGeneratorOutput: Seq[Attribute] = qualifier.map { q =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we qualify all the output attributes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm.... this is actually better.

Copy link
Member Author

@viirya viirya Dec 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, actually this is what Generate did to prepare its output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I did a scan in codes and I can't find anyplace to assign the qualifier parameter for Generate. I only see using None for it.

@hvanhovell Do you know any place we have specified qualifier for a Generate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the same thing when I was looking at this. There is one place: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala#L522

I think the approach you took in this PR is actually the correct one, and that the current one had a latent bug which would be triggered by both the child and the generated producing an attribute with the same name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we make it a method or lazy val?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan making it a method seems better. But it is merged. Should I submit a tiny follow-up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think the current one should be ok too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's fine to leave it.

// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifier(Some(q)))
}.getOrElse(generatorOutput)

if (join) child.output ++ qualified else qualified
def output: Seq[Attribute] = {
if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,26 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
* it.
* @param outer when true, each input row will be output at least once, even if the output of the
* given `generator` is empty. `outer` has no effect when `join` is false.
* @param output the output attributes of this node, which constructed in analysis phase,
* and we can not change it, as the parent node bound with it already.
* @param generatorOutput the qualified output attributes of the generator of this node, which
* constructed in analysis phase, and we can not change it, as the
* parent node bound with it already.
*/
case class GenerateExec(
generator: Generator,
join: Boolean,
outer: Boolean,
output: Seq[Attribute],
generatorOutput: Seq[Attribute],
child: SparkPlan)
extends UnaryExecNode with CodegenSupport {

override def output: Seq[Attribute] = {
if (join) {
child.output ++ generatorOutput
} else {
generatorOutput
}
}

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.UnionExec(unionChildren.map(planLater)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
execution.GenerateExec(
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
generator, join = join, outer = outer, g.qualifiedGeneratorOutput,
planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r: logical.Range =>
Expand Down