Skip to content

Conversation

@WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Aug 1, 2017

What changes were proposed in this pull request?

This patch adds the DataFrames API to the multivariate summarizer (mean, variance, etc.). In addition to all the features of MultivariateOnlineSummarizer, it also allows the user to select a subset of the metrics.

How was this patch tested?

Testcases added.

Performance

Resolve several performance issues in #17419, further optimization pending on SQL team's work. One of the SQL layer performance issue related to these feature has been resolved in #18712, thanks @liancheng and @cloud-fan

Performance data

(test on my laptop, use 2 partitions. tries out = 20, warm up = 10)

The unit of test results is records/milliseconds (higher is better)

Vector size/records number 1/10000000 10/1000000 100/1000000 1000/100000 10000/10000
Dataframe 15149 7441 2118 224 21
RDD from Dataframe 4992 4440 2328 320 33
raw RDD 53931 20683 3966 528 53

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass by myself.

("numNonZeros", NumNonZeros, arrayLType, Seq(ComputeNNZ)),
("max", Max, arrayDType, Seq(ComputeMax, ComputeNNZ)),
("min", Min, arrayDType, Seq(ComputeMin, ComputeNNZ)),
("normL2", NormL2, arrayDType, Seq(ComputeM2)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note Max/Min computation depend on ComputeNNZ because we use an optimization in SummarizerBuffer.update, only update non zero element. So need NNZ statistics to get the the final Max/Min. This is similar in MultivariateOnlineSummarizer.

case object ComputeMin extends ComputeMetrics

class SummarizerBuffer(
requestedMetrics: Seq[Metrics],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SummarizerBuffer is similar to MultivariateOnlineSummarizer. But it has some new features:

  1. support computing only part of the metrics, saving the buffer memory cost.
  2. support a optimized input interface, take advantage of saving data copy.


override def size: Int = _size
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I do not use VectorUDT.deserialize but directly manipulate the UnsafeArrayData coming from InternalRow(In dataframe, it is UnsafeRow actually), it can avoid data copy. cc @liancheng @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this comment to the source code itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val ois = new ObjectInputStream(bis)
ois.readObject().asInstanceOf[SummarizerBuffer]
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will optimize serialize/deserialize by ByteBuffer later. Though it is not the bottleneck currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems function serialize/deserialize only called once for each partition, so I agree it's not bottleneck.

*/
@Since("2.2.0")
def summary(featuresCol: Column, weightCol: Column): Column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support weightCol parameter in a convenient way.

def size: Int
}

private[this] val udt = new VectorUDT
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some better way to get the object of VectorUDT ? cc @cloud-fan


// TODO: this test should not be committed. It is here to isolate some performance hotspots.
test("perf test") {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance test, on vector size from 1 to 10000. (This part code should be removed before the PR merged).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not remove it, we will need it later. I think there is a way to tag the test as a performance harness, but I could not remember how. @liancheng , do you have some suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep the performance test results as ignore("performance test") and update it when we get improvement. You can refer what Spark SQL did.

@thunterdb
Copy link
Contributor

@WeichenXu123 thanks! Can you post some performance numbers as well?

@SparkQA
Copy link

SparkQA commented Aug 1, 2017

Test build #80126 has finished for PR 18798 at commit 2860390.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SummarizerBuffer(
  • trait TraversableIndexedSeq

@WeichenXu123
Copy link
Contributor Author

performance data attached. cc @thunterdb @jkbradley

@thunterdb
Copy link
Contributor

Thank you for the performance numbers @WeichenXu123 , I have a couple of comments:

  • you say that SQL uses adaptive compaction. How bad is that? I assume it adds some overhead.
  • did you just run each experiment once? I would be interested in error bars on these numbers, as it can take up to 30 seconds for the JVM to warm up and optimize the byte code. You should report the geometric mean or the median time of running these experiments to make sure that you are skewed by outliers. Some others will probably have some good advice as well.
  • from the performance numbers, there are 2 different regimes: small vectors, and big vectors (for which even the DataFrame -> RDD conversion is faster than working straight with DataFrames). I would be curious to know the bottlenecks for each case.

If we trust these numbers, the overall conclusion is that the SQL interface adds a 2x-3x performance overhead over RDDs for the time being. @cloud-fan @liancheng are there still some low hanging fruits that could be merged into SQL?

This state of affair is of course far from great, but I am in favor of merging this piece and improve it iteratively with the help of the SQL team, as this code is easy to benchmark and representative of the rest of MLlib, once we start to rely more on dataframe and catalysts, and less on RDDs.

@yanboliang @viirya @kiszk what are your thoughts?

@thunterdb
Copy link
Contributor

cc @hvanhovell as well.

* Users should not directly create such builders, but instead use one of the methods in
* [[Summarizer]].
*/
@Since("2.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not going to be 2.2 anymore

Copy link
Contributor

@thunterdb thunterdb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 thanks a lot, I only have a few comments. Someone else should take a look, as I am the original author of this code.

/**
* Add a new sample to this summarizer, and update the statistical summary.
*/
def addRaw(instance: TraversableIndexedSeq, weight: Double): this.type = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please mention the type of the collection: TraversableIndexedSeq[_]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually surprised you do not get a performance drop instead of using a vector or an array. Shouldn't it be private, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I make the TraversableIndexedSeq vector directly backend on the UnsafeRow from dataframe. (It acts like the ByteBuffer in java) avoid to copy the array data to generate a vector....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 Could you add comment that we operate on raw data (UnsafeRow) directly to avoid copy values?


// For test
def add(sample: Vector, weight: Double): this.type = {
val v = new TraversableIndexedSeq {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to do that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this is going to cause some boxing on the values, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes... this method is only used in testsuite (let writing testing more convenient).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's only used in testsuite, make comment more clear.

this
}

def addRaw(instance: TraversableIndexedSeq): this.type = addRaw(instance, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to ask whether we need make the whole SummarizerBuffer to be private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to make the whole SummarizerBuffer private, as users will not use it.


override def size: Int = _size
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this comment to the source code itself.


// TODO: this test should not be committed. It is here to isolate some performance hotspots.
test("perf test") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not remove it, we will need it later. I think there is a way to tag the test as a performance harness, but I could not remember how. @liancheng , do you have some suggestions?

@WeichenXu123
Copy link
Contributor Author

@thunterdb

  1. The dataframe deserialize from binary data will add overhead, (maybe there is compaction or not, it depends on the datatype, cc @liancheng ) about 1x performance in my test.
  2. My test try 20 times, the first 5 times used as warm up, the numbers in the table is the mean of the 15 times results. You can trust the result I think.
  3. When the vector size is small (1~10), the bottleneck is at many small java object creation I think. So the performance result is different than the larger vector.

@yanboliang
Copy link
Contributor

@WeichenXu123 @thunterdb
Thanks for this great work, we are always happy to see improvement which can help us to migrate MLlib workload to Dataset based API.
Here are my two cents:
1, Form the performance test result, there are performance degradation from 3 -> 2 -> 1.
The difference between case 3 and case 2 is the deserialization cost of each instance, I suspect we can't skip this step, so it maybe more hard to optimize it. However, the difference between case 2 and case 1 is the aggregate implementation. The aggregate on Dataset will use user-defined type as aggregate buffer, so I think we should make some effort to improve TypedImperativeAggregate. I guess it maybe not so hard compared with the previous bottleneck, but this is just my intuition, let’s check with SQL guys. cc @liancheng @cloud-fan @hvanhovell
2, @WeichenXu123 In the current experiment, I saw you only use two partitions. Is there any difference if we run against more partitions or more dataset?
3, @thunterdb I agree that we can get this in and make improvement continuously. When we get desirable result, we can start to migrate other MLlib workload on top of Dataset/catalysts. But we should add comment to let users know(if we can’t get desirable result before
2.3.0) that the performance of Dataset-based multivariate statistic is not good enough currently. Thanks.

@WeichenXu123 WeichenXu123 changed the title [SPARK-19634][ML] Multivariate summarizer - dataframes API [WIP] [SPARK-19634][ML] Multivariate summarizer - dataframes API Aug 4, 2017

override def size: Int = _size
}
val features = udt.deserialize(featuresDatum)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to use VectorUDT.deserialize (Because last version code here won't improve performance markedly but increase code complexity.

@WeichenXu123 WeichenXu123 changed the title [WIP] [SPARK-19634][ML] Multivariate summarizer - dataframes API [SPARK-19634][ML] Multivariate summarizer - dataframes API Aug 7, 2017
@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80359 has finished for PR 18798 at commit 4f32e27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80363 has finished for PR 18798 at commit 6053d0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@thunterdb thunterdb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 thanks, this looks ready except for one comment about performance.

@yanboliang , can you merge this PR after this is addressed, if this looks good for you? Thank you in advance.

* @return a builder.
* @throws IllegalArgumentException if one of the metric names is not understood.
*/
@Since("2.3.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put a comment about performance to indicate that it is about 3x slower than using the RDD interface.

* val meanDF = dataframe.select(Summarizer.mean($"features"))
* val Row(mean_) = meanDF.first()
* }}}
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment about performance here.

* val Row(mean_) = meanDF.first()
* }}}
*
* Note: Currently, the performance of this interface is about 2x~3x slower then using the RDD
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will trace this the performance gap here with related sql layer improvement in the future.

@SparkQA
Copy link

SparkQA commented Aug 8, 2017

Test build #80403 has finished for PR 18798 at commit b02db42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 8, 2017

Test build #80407 has finished for PR 18798 at commit b02db42.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@thunterdb
Copy link
Contributor

@yanboliang do you feel comfortable to merge this PR? I think that all the questions have been addressed.

@yanboliang
Copy link
Contributor

@thunterdb I'm on travel these days, will do a final pass and merge it on next Monday/Tuesday. Thanks.

@viirya
Copy link
Member

viirya commented Aug 15, 2017

Sorry can we make the performance data clear? Currently it doesn't say what the unit of the numbers is.

@WeichenXu123
Copy link
Contributor Author

@viirya Sure! comment updated.

override def children: Seq[Expression] = featuresExpr :: weightExpr :: Nil

override def update(state: SummarizerBuffer, row: InternalRow): SummarizerBuffer = {
// val features = udt.deserialize(featuresExpr.eval(row))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

@viirya
Copy link
Member

viirya commented Aug 15, 2017

@WeichenXu123 Thanks! Looks good.

// val features = udt.deserialize(featuresExpr.eval(row))
val featuresDatum = featuresExpr.eval(row).asInstanceOf[InternalRow]

val features = udt.deserialize(featuresDatum)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserialize on each Vector should be a bottleneck.

/**
* Add a new sample to this summarizer, and update the statistical summary.
*/
def add(instance: Vector, weight: Double): this.type = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the usage of this Vector, I think we can directly work on the serialized Vector data (size, indices, values). It should reduce much of time on deserialization of Vector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I have tried your suggestion in the previous version code, but it do not bring performance advantage.
You can check my previous version code (in this commit "optimize summarizer buffer") and run tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I saw it. Thanks.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Aug 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If directly work on serialized data (UnsafeArrayData), it only avoid the array copy(which save little time), but brings extra cost when calling UnsafeArrayData.getDouble)
and it will increase code complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask how the performance test runs? Especially for the RDD part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
modify ignore("performance test") to test("performance test")
then run test: SummarizerSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I didn't review the test thoroughly.

new SummaryBuilderImpl(typedMetrics, computeMetrics)
}

def mean(col: Column): Column = getSingleMetric(col, "mean")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Since.

* The metrics that are currently implemented.
*/
sealed trait Metrics extends Serializable
case object Mean extends Metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep these case objects private?

override def children: Seq[Expression] = featuresExpr :: weightExpr :: Nil

override def update(state: SummarizerBuffer, row: InternalRow): SummarizerBuffer = {
// val features = udt.deserialize(featuresExpr.eval(row))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the way commented out be more succinct?

}

override def merge(state: SummarizerBuffer,
other: SummarizerBuffer): SummarizerBuffer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Align

throw new TestFailedException(Some(s"Failure with hint $hint"), Some(tfe), 1)
}
}
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove them if it's not used.


// TODO: this test should not be committed. It is here to isolate some performance hotspots.
ignore("performance test") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you paste your performance test result here? Just like here.

@yanboliang
Copy link
Contributor

@WeichenXu123 I left some minor comments, otherwise, LGTM. Thanks.

@WeichenXu123
Copy link
Contributor Author

@yanboliang I will update ASAP, thanks!

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80668 has finished for PR 18798 at commit 7540c4c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80669 has finished for PR 18798 at commit b081fc3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80671 has finished for PR 18798 at commit c82958f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed abstract class SummaryBuilder

Copy link
Contributor

@yanboliang yanboliang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'll merge it tomorrow if there is no further discussion. Thanks for all.

@thunterdb
Copy link
Contributor

Thank you @yanboliang.

@yanboliang
Copy link
Contributor

Merged into master, thanks for all.

@asfgit asfgit closed this in 07549b2 Aug 16, 2017
@WeichenXu123 WeichenXu123 deleted the SPARK-19634-dataframe-summarizer branch April 24, 2019 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants