Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

ExpressionEncoder is just a container for serialization and deserialization expressions, we can use these expressions to build TypedAggregateExpression directly, so that it can fit in DeclarativeAggregate, which is more efficient.

One trick is, for each buffer serializer expression, it will reference to the result object of serialization and function call. To avoid re-calculating this result object, we can serialize the buffer object to a single struct field, so that we can use a special Expression to only evaluate result object once.

How was this patch tested?

existing tests

@cloud-fan
Copy link
Contributor Author

cc @yhuai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have a problem, bufferSerializer is a list of expressions, and the reduced expression will appear many times. However, it should be only executed once, and it's may not good to depend on subexpression elimination to optimize this case.

cc @marmbrus

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fundamentally the same problem we are hitting when trying to codgen Map using Project or am I not understanding the issue?

I wonder if we could have a special expression that takes an object and turns it into a struct in one shot instead of exploding it to multiple columns? Just a random though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's the same problem.

turning an object to a struct in one shot doesn't solve the problem, as we need to flatten it to Seq[Expression] here, which will be a list of GetStructField and the "one shot" expression still appears many times.

Actually I think we don't need to do anything here. Although the reduced expression appears many times, they are same references and subexpression elimination should always work. Or we can introduce the Holder and Reference to make it more clear.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54533 has finished for PR 12067 at commit 68ede96.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TypedAggregateExpression2(

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54537 has finished for PR 12067 at commit 3933562.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

This is a great idea if we can get it to work.

@cloud-fan
Copy link
Contributor Author

This reimplement works, but is inefficient, as the subexpression elimination is not supported in TungstenAggregate's whole stage codegen. cc @davies , how hard is it to support subexpression elimination there? I took a quick look but need more time to get familiar with the whole stage codegen framework first...

@cloud-fan cloud-fan changed the title [SPARK-14275][SQL][WIP] Reimplement TypedAggregateExpression to DeclarativeAggregate [SPARK-14275][SQL] Reimplement TypedAggregateExpression to DeclarativeAggregate Mar 31, 2016
@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54621 has finished for PR 12067 at commit 25ee508.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s\"Unable to generate an encoder for inner class$`
    • case class UnresolvedDeserializer(deserializer: Expression, inputAttributes: Seq[Attribute])

@rxin
Copy link
Contributor

rxin commented Apr 2, 2016

cc @davies again - can you take a look at wenchen's question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe IN, BUF, and OUT

we have mostly used upper case type parameters in Spark.

@marmbrus
Copy link
Contributor

marmbrus commented Apr 4, 2016

It would be awesome to run Spark SQL perf and see what the speed up is here after the elimination is fixed. You might even be able to do it directly from the Spark repo.

@davies
Copy link
Contributor

davies commented Apr 4, 2016

@cloud-fan @marmbrus I think could do the similar trick in MapElements in TungstenAggregagte, evaluate the functions first, then replace them with the generated variables in update/merge expressions.

asfgit pushed a commit that referenced this pull request Apr 5, 2016
…bjectOperator

## What changes were proposed in this pull request?

This PR decouples deserializer expression resolution from `ObjectOperator`, so that we can use deserializer expression in normal operators. This is needed by #12061 and #12067 , I abstracted the logic out and put them in this PR to reduce code change in the future.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #12131 from cloud-fan/separate.
@cloud-fan
Copy link
Contributor Author

generated code snippet in whole stage codegen for
val ds = Seq(("a", 10)).toDS().groupByKey(_._1).agg(typed.sum(_._2)):

/* 095 */       // evaluate aggregate function
/* 096 */       /* org.apache.spark.sql.execution.aggregate.TypedSumDouble@25ffdd4a.reduce */
/* 097 */       /* org.apache.spark.sql.execution.aggregate.TypedSumDouble@25ffdd4a */
/* 098 */       /* expression: org.apache.spark.sql.execution.aggregate.TypedSumDouble@25ffdd4a */
/* 099 */       Object agg_obj = ((Expression) references[1]).eval(null);
/* 100 */       org.apache.spark.sql.expressions.Aggregator agg_value4 = (org.apache.spark.sql.expressions.Aggregator) agg_obj;
/* 101 */       /* input[0, double] */
/* 102 */       double agg_value5 = agg_aggBuffer.getDouble(0);
/* 103 */       /* newInstance(class scala.Tuple2) */
/* 104 */       /* input[1, string].toString */
/* 105 */       java.lang.String agg_value7 = inputadapter_isNull ? null : (java.lang.String) inputadapter_value.toString();
/* 106 */       boolean agg_isNull7 = agg_value7 == null;
/* 107 */
/* 108 */       final scala.Tuple2 agg_value6 = new scala.Tuple2(agg_value7, inputadapter_value1);
/* 109 */       final boolean agg_isNull6 = false;
/* 110 */       double agg_value3 = false ? -1.0 : (double) ((java.lang.Double)agg_value4.reduce(agg_value5, agg_value6)).doubleValue();
/* 111 */       // update aggregate buffer
/* 112 */       if (!false) {
/* 113 */         agg_aggBuffer.setDouble(0, agg_value3);
/* 114 */       } else {
/* 115 */         agg_aggBuffer.setNullAt(0);
/* 116 */       }

@cloud-fan
Copy link
Contributor Author

generated code snippet in mutable projection codegen for complex buffer type UDAF

object ComplexResultAgg extends Aggregator[(String, Int), (Long, Long), (Long, Long)] {

  override def zero: (Long, Long) = (0, 0)

  override def reduce(countAndSum: (Long, Long), input: (String, Int)): (Long, Long) = {
    (countAndSum._1 + 1, countAndSum._2 + input._2)
  }

  override def merge(b1: (Long, Long), b2: (Long, Long)): (Long, Long) = {
    (b1._1 + b2._1, b1._2 + b2._2)
  }

  override def finish(reduction: (Long, Long)): (Long, Long) = reduction
}

val ds = Seq("a" -> 1, "a" -> 3, "b" -> 3).toDS()
ds.groupByKey(_._1).agg(ComplexResultAgg.toColumn)

:

/* 033 */     /* bufferexpression(input[0, scala.Tuple2]._1 AS _1#29L, input[0, scala.Tuple2]._2 AS _2#30L, org.apache.spark.sql.ComplexResultAgg... */
/* 034 */     /* [email protected] */
/* 035 */     /* org.apache.spark.sql.ComplexResultAgg$@3f33d0f3 */
/* 036 */     /* expression: org.apache.spark.sql.ComplexResultAgg$@3f33d0f3 */
/* 037 */     Object obj = ((Expression) references[0]).eval(null);
/* 038 */     org.apache.spark.sql.expressions.Aggregator value2 = (org.apache.spark.sql.expressions.Aggregator) obj;
/* 039 */     /* newInstance(class scala.Tuple2) */
/* 040 */     /* input[0, struct<_1:bigint,_2:bigint>]._1 */
/* 041 */     /* input[0, struct<_1:bigint,_2:bigint>] */
/* 042 */     InternalRow value5 = i.getStruct(0, 2);
/* 043 */     long value4 = -1L;
/* 044 */
/* 045 */     value4 = value5.getLong(0);
/* 046 */     /* input[0, struct<_1:bigint,_2:bigint>]._2 */
/* 047 */     /* input[0, struct<_1:bigint,_2:bigint>] */
/* 048 */     InternalRow value7 = i.getStruct(0, 2);
/* 049 */     long value6 = -1L;
/* 050 */
/* 051 */     value6 = value7.getLong(1);
/* 052 */
/* 053 */
/* 054 */
/* 055 */     final scala.Tuple2 value3 = new scala.Tuple2(value4, value6);
/* 056 */     final boolean isNull3 = false;
/* 057 */     /* newInstance(class scala.Tuple2) */
/* 058 */     /* input[2, struct<_1:bigint,_2:bigint>]._1 */
/* 059 */     /* input[2, struct<_1:bigint,_2:bigint>] */
/* 060 */     InternalRow value10 = i.getStruct(2, 2);
/* 061 */     long value9 = -1L;
/* 062 */
/* 063 */     value9 = value10.getLong(0);
/* 064 */     /* input[2, struct<_1:bigint,_2:bigint>]._2 */
/* 065 */     /* input[2, struct<_1:bigint,_2:bigint>] */
/* 066 */     InternalRow value12 = i.getStruct(2, 2);
/* 067 */     long value11 = -1L;
/* 068 */
/* 069 */     value11 = value12.getLong(1);
/* 070 */
/* 071 */
/* 072 */
/* 073 */     final scala.Tuple2 value8 = new scala.Tuple2(value9, value11);
/* 074 */     final boolean isNull8 = false;
/* 075 */     scala.Tuple2 value1 = false ? null : (scala.Tuple2) value2.merge(value3, value8);
/* 076 */     boolean isNull1 = value1 == null;
/* 077 */     /* struct(lambdavariable(value1, isNull1, ObjectType(class scala.Tuple2))._1 AS _1#29L, lambdavariable(value1, isNull1, ObjectType(... */
/* 078 */     boolean isNull13 = false;
/* 079 */     final Object[] values = new Object[2];
/* 080 */     /* lambdavariable(value1, isNull1, ObjectType(class scala.Tuple2))._1 */
/* 081 */     long value14 = isNull1 ? -1L : (long) ((java.lang.Long)value1._1()).longValue();
/* 082 */     if (isNull1) {
/* 083 */       values[0] = null;
/* 084 */     } else {
/* 085 */       values[0] = value14;
/* 086 */     }
/* 087 */
/* 088 */     /* lambdavariable(value1, isNull1, ObjectType(class scala.Tuple2))._2 */
/* 089 */     long value15 = isNull1 ? -1L : (long) ((java.lang.Long)value1._2()).longValue();
/* 090 */     if (isNull1) {
/* 091 */       values[1] = null;
/* 092 */     } else {
/* 093 */       values[1] = value15;
/* 094 */     }
/* 095 */     final InternalRow value13 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(values);
/* 096 */     this.value_0 = value13;

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55209 has finished for PR 12067 at commit 905234e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class EvaluateOnce(serializer: Seq[Expression], obj: Expression, dataType: DataType)

@cloud-fan
Copy link
Contributor Author

the benchmark result of master branch is extremely slow:

aggregate:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
DataFrame sum                             117 /  262         85.7          11.7       1.0X
Dataset sum using Aggregator         177126 / 178618          0.1       17712.6       0.0X
Dataset complex Aggregator           359263 / 360736          0.0       35926.3       0.0X

not sure why...

@cloud-fan
Copy link
Contributor Author

retest this please.

@cloud-fan
Copy link
Contributor Author

cc @davies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Yay!

@yhuai
Copy link
Contributor

yhuai commented Apr 8, 2016

@davies Can you review this?

@rxin
Copy link
Contributor

rxin commented Apr 8, 2016

if we can reuse a single object and mutate the object in place, would it be the same speed?

@rxin
Copy link
Contributor

rxin commented Apr 8, 2016

The part I don't get is that even in the RDD case, we'd need to create an object per row. This is equivalent to the "deserialization" in aggregator, since they both just call the ctor of the class. Why is RDD faster?

@cloud-fan
Copy link
Contributor Author

In the benchmark, for RDD, we first apply a function to turn a long into a Data, then do aggregate. For Dataset, we first turn a long to a UTFString, then turn the UTF8String to String and create a Data, then do aggregate. This may be one reason why Dataset is slower than RDD.

@cloud-fan
Copy link
Contributor Author

And I think "reuse a single object" should help, as then we only need to create one object for one partition. But it's like cheating, because RDD doesn't reuse the object, it's unfair to compare Dataset with RDD when we internally reuse the same object.

@rxin
Copy link
Contributor

rxin commented Apr 8, 2016

Well it's not cheating if the user doesn't need to explicitly reuse.

@SparkQA
Copy link

SparkQA commented Apr 8, 2016

Test build #55298 has finished for PR 12067 at commit 7a136c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 11, 2016

Test build #55499 has finished for PR 12067 at commit 045a9be.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 11, 2016

Test build #55506 has finished for PR 12067 at commit 9e9be45.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReferenceToExpressions(result: Expression, children: Seq[Expression])

@SparkQA
Copy link

SparkQA commented Apr 11, 2016

Test build #55508 has finished for PR 12067 at commit 050e942.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReferenceToExpressions(result: Expression, children: Seq[Expression])

@SparkQA
Copy link

SparkQA commented Apr 11, 2016

Test build #55509 has finished for PR 12067 at commit 4ee5ac1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReferenceToExpressions(result: Expression, children: Seq[Expression])

@yhuai
Copy link
Contributor

yhuai commented Apr 11, 2016

@davies Can you review?

RDD 216 / 237 46.3 21.6 4.2X
RDD 1935 / 2105 51.7 19.3 1.0X
DataFrame 756 / 799 132.3 7.6 2.6X
Dataset 7359 / 7506 13.6 73.6 0.3X
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did not see much performance improvements from this PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #12067 (comment)
It's hundreds times faster...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the old value of this? It's wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sorry I updated the benchmark code before, let me run it again on master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, this line is for back-to-back map, but this PR aims to improve the aggregator case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this old benchmark cases according to rxin's comment: #12067 (comment)

override def toString: String = {
s"""${aggregator.getClass.getSimpleName}(${children.mkString(",")})"""
val input = inputDeserializer match {
case Some(UnresolvedDeserializer(deserializer, _)) => deserializer.dataType.simpleString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is deserializer always resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I missed here, this case should be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove it when merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this line is needed. The input deserializer is set by TypedColumn.withInputType and is unresolved at the first place.

@davies
Copy link
Contributor

davies commented Apr 14, 2016

LGTM, will merge this once it passed the tests.

@SparkQA
Copy link

SparkQA commented Apr 14, 2016

Test build #2788 has finished for PR 12067 at commit 4ee5ac1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReferenceToExpressions(result: Expression, children: Seq[Expression])

@cloud-fan
Copy link
Contributor Author

@davies thanks for your review! merging to master!

@asfgit asfgit closed this in 297ba3f Apr 15, 2016
asfgit pushed a commit that referenced this pull request Apr 19, 2016
… type

## What changes were proposed in this pull request?

After #12067, we now use expressions to do the aggregation in `TypedAggregateExpression`. To implement buffer merge, we produce a new buffer deserializer expression by replacing `AttributeReference` with right-side buffer attribute, like other `DeclarativeAggregate`s do, and finally combine the left and right buffer deserializer with `Invoke`.

However, after #12338, we will add loop variable to class members when codegen `MapObjects`. If the `Aggregator` buffer type is `Seq`, which is implemented by `MapObjects` expression, we will add the same loop variable to class members twice(by left and right buffer deserializer), which cause the `ClassFormatError`.

This PR fixes this issue by calling `distinct` before declare the class menbers.

## How was this patch tested?

new regression test in `DatasetAggregatorSuite`

Author: Wenchen Fan <[email protected]>

Closes #12468 from cloud-fan/bug.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants