Skip to content

Conversation

@clockfly
Copy link
Contributor

What changes were proposed in this pull request

This PR allows user to define an AggregateFunction which can store arbitrary Java objects
in aggregation buffer, and use the Java object to do aggregation. Before this PR, user are only allowed to store a limited set of object type in aggregation buffer. Please see example usage at
class org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer

How was this patch tested?

Unit tests.

Revert: object aggregation buffer
@SparkQA
Copy link

SparkQA commented Aug 19, 2016

Test build #64103 has finished for PR 14723 at commit 520a17f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait WithObjectAggregateBuffer

* 1. Spark framework moves on to next group, until all groups have been processed.
*/
trait WithObjectAggregateBuffer {
this: ImperativeAggregate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semes we do not really need this line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess having this line will make this trait hard to be used in Java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, seems this trait will be still an java interface. But, I think in general, we do not really need to have this line.

@yhuai
Copy link
Contributor

yhuai commented Aug 21, 2016

Can you create a jira?

* object A with current inputRow. After updating, object A is stored back to mutableAggBuffer.
* 1. After processing all rows of current group, the framework will call method
* `serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to serialize object A
* to a serializable format in place.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to a Spark SQL internal format(mostly BinaryType) in place

@clockfly clockfly force-pushed the object_aggregation_buffer_part1 branch from 9b16f89 to 9ae648c Compare August 22, 2016 06:18
@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64179 has finished for PR 14723 at commit 9b16f89.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2016

Test build #64180 has finished for PR 14723 at commit 9ae648c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* This traits allows an AggregateFunction to store **arbitrary** Java objects in internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: traits => trait

@clockfly
Copy link
Contributor Author

@liancheng @cloud-fan
@yhuai @hvanhovell @gatorsmile
This PR is superceded by #14753, please review the new PR instead.

The motivation behind the change is that the aggregation function is also used by WindowExec, which may do continous update and eval. We have to override eval of ImperativeAggregate so that eval can accepts an aggregation buffer which contains generic Java object.

For example:

agg.update(buffer, row1)
agg.eval(buffer)
agg.update(buffer, row2)
agg.eal(buffer)

@clockfly clockfly closed this Aug 22, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants