Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Jun 21, 2016

What changes were proposed in this pull request?

The current spark cannot use HashAggregateExec for non-partial aggregates because Collect (CollectSet/CollectList) uses a single shared buffer inside. Since SortAggregateExec is expensive in some cases, we'd better off fixing this.

This pr is to change plans from

SortAggregate(key=[key#3077], functions=[collect_set(value#3078, 0, 0)], output=[key#3077,collect_set(value)#3088])
+- *Sort [key#3077 ASC], false, 0
   +- Exchange hashpartitioning(key#3077, 5)
      +- Scan ExistingRDD[key#3077,value#3078]

into

HashAggregate(keys=[key#3077], functions=[collect_set(value#3078, 0, 0)], output=[key#3077, collect_set(value)#3088])
+- Exchange hashpartitioning(key#3077, 5)
   +- Scan ExistingRDD[key#3077,value#3078]

How was this patch tested?

Checked non-partial aggregates (collect_set and collect_list) worked well for HashAggregateExec in DataFrameSuite.

@SparkQA
Copy link

SparkQA commented Jun 21, 2016

Test build #60919 has finished for PR 13802 at commit 517d7ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Seq(Row(Seq(1, 2, 3), Seq(Map(3 -> 0), Map(3 -> 0), Map(4 -> 1))))
)
// TODO: We need to implement `UnsafeMapData#hashCode` and `UnsafeMapData#equals` for getting
// a set of input data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to implement them?

@SparkQA
Copy link

SparkQA commented Jun 21, 2016

Test build #60928 has finished for PR 13802 at commit 0506453.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2016

Test build #60933 has finished for PR 13802 at commit 88ba697.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@maropu this won't work for other hive_udfs since these also maintain internal state, and currently require per group processing. This also has a greater potential of creating out-of-memory errors than the sort based approach.

I do think there is merit in the general ideal; but I think we should be focussing on creating a growable bytes-to-bytes map and creating byte backed mutable ArrayData and MapData implementations.

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

@hvanhovell oh, I see. okay, I'll check we can implement mutable ArrayData and MapData.
btw, I have some questions;

  1. Any reason to use SortAggregateExec for all the non-partial aggregates? It seems it is okay to use HashAggregateExec for non-partial ones except for collect_xxx and hive_udaf.
  2. Why do we have no hashCode and equals in UnsafeMapData? ArrayBasedMapData already has these override functions.

@hvanhovell
Copy link
Contributor

@maropu all aggregates that current set supportsPartial = false cannot be partially aggregated and require that the entire group is processed in one step. So the name is a bit misleading. I suppose we could rename it.

UnsafeMapData is typically part of an UnsafeRow which already implements equals() and hashCode() without requiring its elements to implement these methods (it uses the backing byte array). I suppose we can add these methods.

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

@hvanhovell As for UnsafeMapData, could you check #13847?

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

As for supportPartial, I could understand that collect and hive_udaf has such a limitation though,
how about AggregateWindowFunction? It seems these functions RowNumber and Rank work well even for HashAggregateExec and they dont have the limitation. BTW, we at least need to fix comments for supportPartial, "Currently Hive UDAF is the only one that doesn't support partial aggregation." is incorrect now; https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L178

@hvanhovell
Copy link
Contributor

I think we should rename and document supportsPartial to reflect what it actually does.

Rank and RowNumer are window functions. They both rely on ordered evaluation, and they should never be evaluated in any *AggregateExec operator. We have a rule to prevent this in the analyzer.

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

Thanks for your explanation!

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

Is it okay to make a new pr to fix these?

@hvanhovell
Copy link
Contributor

What do you want to fix? WindowAggregateFunctions?

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

No, I'd just like to fix incorrect comments. or, could you?

@hvanhovell
Copy link
Contributor

Could you have a go? Would be great!

@maropu
Copy link
Member Author

maropu commented Jun 22, 2016

okay

@hvanhovell
Copy link
Contributor

@maropu could you close this one? It is not that relevant anymore. Thanks for working on it though!

@maropu
Copy link
Member Author

maropu commented Aug 31, 2016

yea, thanks!

@maropu maropu closed this Aug 31, 2016
@maropu maropu deleted the SPARK-16094 branch July 5, 2017 11:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants