Skip to content

Conversation

@thunterdb
Copy link
Contributor

@thunterdb thunterdb commented Mar 24, 2017

What changes were proposed in this pull request?

This patch adds the DataFrames API to the multivariate summarizer (mean, variance, etc.). In addition to all the features of MultivariateOnlineSummarizer, it also allows the user to select a subset of the metrics. This should resolve some performance issues related to computing unrequested metrics.

Furthermore, it uses the BLAS API to the extent possible, so that the given code should be efficient for the dense case.

How was this patch tested?

This patch includes most of the tests of the RDD-based. It compares results against the existing MultivariateOnlineSummarizer as well as adding more tests.

This patch also includes some documentation for some low-level constructs such as TypedImperativeAggregate.

Performance

I have not run tests against the existing implementation. However, this patch uses the recommended low-level SQL APIs, so it should be interesting to compare both implementation in that respect.

Thanks to @hvanhovell and Cheng Liang for suggestions on SparkSQL.

@thunterdb thunterdb changed the title [SPARK-19634][ML][WIP] Multivariate summarizer - dataframes API [SPARK-19634][ML] Multivariate summarizer - dataframes API Mar 24, 2017
@SparkQA
Copy link

SparkQA commented Mar 25, 2017

Test build #75188 has finished for PR 17419 at commit 58b17dc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 25, 2017

Test build #75187 has finished for PR 17419 at commit 35eaeb0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 25, 2017

Test build #75189 has finished for PR 17419 at commit ffe5cfe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@thunterdb
Copy link
Contributor Author

@sethah it would have been nice, but I do not think we should merge it this late into the release cycle.

@thunterdb
Copy link
Contributor Author

I have added a small perf test to find the performance bottlenecks. Note that this test works on the worst case (vectors of size 1) from the perspective of overhead. Here are the numbers I currently get. I will profile the code to see if there are some obvious targets for optimization:

[min ~ median ~ max], higher is better:

RDD = [2482 ~ 46150 ~ 48354] records / milli
dataframe (variance only) = [4217 ~ 4557 ~ 4848] records / milli
dataframe = [2887 ~ 4420 ~ 4717] records / milli

@SparkQA
Copy link

SparkQA commented Mar 28, 2017

Test build #75285 has finished for PR 17419 at commit 662f62c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Mar 29, 2017

RDD = [2482 ~ 46150 ~ 48354] records / milli

The number is so varied?

Looks like RDD is faster than dataframe version 10 times...

val rdd1 = sc.parallelize(1 to n).map { idx =>
OldVectors.dense(idx.toDouble)
}
val trieouts = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that 10 times without warmup is too small for performance measurement.
Can we use Benchmark class or add warmup run as Benchmark class does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not try about that class, thanks. It should stabilize the results.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When results are stabilized, I think that it would be good to keep results with ignore("benchmark") as other benchmarks does.

private[ml]
object SummaryBuilderImpl extends Logging {

def implementedMetrics: Seq[String] = allMetrics.map(_._1).sorted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy val should be enough.


private def b(x: Array[Double]): Vector = Vectors.dense(x)

private def l(x: Array[Long]): Vector = b(x.map(_.toDouble))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this. It is better to rename it.

// All the fields that we compute on demand:
// TODO: the most common case is dense vectors. In that case we should
// directly use BLAS instructions instead of iterating through a scala iterator.
v.foreachActive { (index, value) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDD's Summarizer doesn't have BLAS optimization actually. So this may not be the reason for the performance gap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes it does not. Note that the benchmark below is works with vectors of size 1, so as to analyze the overhead of dataframes vs RDDs. I will put a more realistic benchmark later.

override def update(buff: Buffer, row: InternalRow): Buffer = {
// Unsafe rows do not play well with UDTs, it seems.
// Directly call the deserializer.
val v = udt.deserialize(row.getStruct(0, udt.sqlType.size))
Copy link
Member

@viirya viirya Mar 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should hurt performance.

When deserializing a vector, we copy (indices and) values array from the unsafe row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think

val v = udt.deserialize(row.getStruct(0, udt.sqlType.size))

has some problems.
We cannot directly use getter method on row parameter passed in, because the ordinal of the column we want to get depends on the underlying catalyst layer, instead, we should use:

val datum = child.eval(row) 
val featureVector  = udt.deserialize(datum)

to get column value.

Copy link
Contributor

@WeichenXu123 WeichenXu123 Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to use weight column when summarizing, I think we can define the UDAF as:
summary(featureCol, weightCol)
and in the constructor of MetricsAggregate pass the weight column in.
Example code as following:

case class MetricsAggregate(
      requested: Seq[Metrics],
      featureExpr: Expression, // feature column expr
      weightExpr: Expression, // weight column expr
      mutableAggBufferOffset: Int,
      inputAggBufferOffset: Int) extends TypedImperativeAggregate[Buffer] {

    override def children: Seq[Expression] = featureExpr :: weightExpr :: Nil

    override def update(buff: Buffer, row: InternalRow): Buffer = {
      val featureVector = udt.deserialize(featureExpr.eval(row))
      val weight = weightExpr.eval(row)
      Buffer.updateInPlace(buff, featureVector, weight)
      buff
    }
    ....
}

def summary(featureCol: Column, weightCol: Column): Column = {
    val agg = MetricsAggregate(
      requestedMetrics,
      featureCol.expr,
      weightCol.expr,
      mutableAggBufferOffset = 0,
      inputAggBufferOffset = 0)
    new Column(AggregateExpression(agg, mode = Complete, isDistinct = false))
}

// handle the case user do not specify weight column
def summary(featureCol: Column): Column = {
    summary(featureCol, lit(1.0))
}

cc @cloud-fan @liancheng @yanboliang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataframes = [2766.648008567718 ~ 5091.204527768661 ~ 5716.359795809639] records / milli
@thunterdb
Copy link
Contributor Author

I looked a bit deeper into the performance aspect. Here are some quick insights:

  • there was an immediate bottleneck in VectorUDT, which boosts the performance already by 3x
  • it is not clear if switching to pure Breeze operations helps given the overhead for tiny vectors. I will need to do more analysis on larger vectors.
  • now, most of the time is roughly split between ObjectAggregationIterator.processInputs (40%), some codegen'ed expression (20%) and our own MetricsAggregate.update (35%)

That benchmark focuses on the overhead of catalyst. I will do another benchmark with dense vectors to see how it fares in practice with more real data.

@SparkQA
Copy link

SparkQA commented Mar 30, 2017

Test build #75406 has finished for PR 17419 at commit a569dac.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private case class MetricsAggregate(
requested: Seq[Metrics],
startBuffer: Buffer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not pass around the startBuffer, but create an initial one in createAggregationBuffer

override def update(buff: Buffer, row: InternalRow): Buffer = {
// Unsafe rows do not play well with UDTs, it seems.
// Directly call the deserializer.
val v = udt.deserialize(row.getStruct(0, udt.sqlType.size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think

val v = udt.deserialize(row.getStruct(0, udt.sqlType.size))

has some problems.
We cannot directly use getter method on row parameter passed in, because the ordinal of the column we want to get depends on the underlying catalyst layer, instead, we should use:

val datum = child.eval(row) 
val featureVector  = udt.deserialize(datum)

to get column value.

override def update(buff: Buffer, row: InternalRow): Buffer = {
// Unsafe rows do not play well with UDTs, it seems.
// Directly call the deserializer.
val v = udt.deserialize(row.getStruct(0, udt.sqlType.size))
Copy link
Contributor

@WeichenXu123 WeichenXu123 Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to use weight column when summarizing, I think we can define the UDAF as:
summary(featureCol, weightCol)
and in the constructor of MetricsAggregate pass the weight column in.
Example code as following:

case class MetricsAggregate(
      requested: Seq[Metrics],
      featureExpr: Expression, // feature column expr
      weightExpr: Expression, // weight column expr
      mutableAggBufferOffset: Int,
      inputAggBufferOffset: Int) extends TypedImperativeAggregate[Buffer] {

    override def children: Seq[Expression] = featureExpr :: weightExpr :: Nil

    override def update(buff: Buffer, row: InternalRow): Buffer = {
      val featureVector = udt.deserialize(featureExpr.eval(row))
      val weight = weightExpr.eval(row)
      Buffer.updateInPlace(buff, featureVector, weight)
      buff
    }
    ....
}

def summary(featureCol: Column, weightCol: Column): Column = {
    val agg = MetricsAggregate(
      requestedMetrics,
      featureCol.expr,
      weightCol.expr,
      mutableAggBufferOffset = 0,
      inputAggBufferOffset = 0)
    new Column(AggregateExpression(agg, mode = Complete, isDistinct = false))
}

// handle the case user do not specify weight column
def summary(featureCol: Column): Column = {
    summary(featureCol, lit(1.0))
}

cc @cloud-fan @liancheng @yanboliang

}

private def updateInPlaceDense(buffer: Buffer, v: DenseVector, w: Double): Unit = {
val epsi = Double.MinPositiveValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the purpose of the code using breeze here to use BLAS to improve performance ?
BUT in breeze implementation ops between vectors do not use BLAS, instead in breeze it use cForRange.
cc @yanboliang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intention here is to make sequential operation conveniently and efficiently. AFAIK, cForRange is also very efficient. Thanks.

times_df ::= dt
// scalastyle:off
print("Dataframes", times_df)
// scalastyle:on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The print statement should move out from the loop.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Jul 20, 2017

As the dataframe version is much slower than RDD version (currently test against vector of size 1)
I also guess there is some performance issue in ObjectAggregationIterator.processInput()
in the following code block:

  private def processInputs(): Unit = {
    // ...
    if (groupingExpressions.isEmpty) {
      // If there is no grouping expressions, we can just reuse the same buffer over and over again.
      val groupingKey = groupingProjection.apply(null)
      val buffer: InternalRow = getAggregationBufferByKey(hashMap, groupingKey)
      while (inputRows.hasNext) {
        val newInput = safeProjection(inputRows.next()) 
        processRow(buffer, newInput)
      }
    }
    ...

This statement val newInput = safeProjection(inputRows.next()) maybe do some redundant data copy (for work-around some bugs?)

cc @cloud-fan @liancheng

@liancheng
Copy link
Contributor

@WeichenXu123 and I did some profiling using jvisualvm and found that 40% of the time is spent in the copy performed by this safeProjection. This is a known issue used to fight against the false sharing issue @cloud-fan and I hit before.

@cloud-fan tried to fix this issue in #15082 but that PR didn't work out due to some other concerns (I can't remember all the details now).

@cloud-fan, any ideas about improving ObjectHashAggregateExec (e.g. adding code generation support)?

@cloud-fan
Copy link
Contributor

The copy problem is fixed in #18483 , I think we can remove this workaround in ObjectHashAggregateExec.

asfgit pushed a commit that referenced this pull request Jul 24, 2017
…ash aggregate

## What changes were proposed in this pull request?

In #18483 , we fixed the data copy bug when saving into `InternalRow`, and removed all workarounds for this bug in the aggregate code path. However, the object hash aggregate was missed, this PR fixes it.

This patch is also a requirement for #17419 , which shows that DataFrame version is slower than RDD version because of this issue.

## How was this patch tested?

existing tests

Author: Wenchen Fan <[email protected]>

Closes #18712 from cloud-fan/minor.
}

def structureForMetrics(metrics: Seq[Metrics]): StructType = {
val dct = allMetrics.map { case (n, m, dt, _) => m -> (n, dt) }.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m -> (n, dt) This expr has some syntax problem.
The scala complier will turn -> into .-> method and will treat n, dt as two parameters.
And this cause compiling error (after rebase this PR with master)
we can use:
m -> (n -> dt) or (m, (n, dt)) or m -> ((n, dt))
instead.

@thunterdb
Copy link
Contributor Author

I am going to close this PR, since this is being taken over by @WeichenXu123 in #18798 .

@thunterdb thunterdb closed this Aug 1, 2017
ghost pushed a commit to dbtsai/spark that referenced this pull request Aug 16, 2017
## What changes were proposed in this pull request?

This patch adds the DataFrames API to the multivariate summarizer (mean, variance, etc.). In addition to all the features of MultivariateOnlineSummarizer, it also allows the user to select a subset of the metrics.

## How was this patch tested?

Testcases added.

## Performance
Resolve several performance issues in apache#17419, further optimization pending on SQL team's work. One of the SQL layer performance issue related to these feature has been resolved in apache#18712, thanks liancheng and cloud-fan

### Performance data

(test on my laptop, use 2 partitions. tries out = 20, warm up = 10)

The unit of test results is records/milliseconds (higher is better)

Vector size/records number | 1/10000000 | 10/1000000 | 100/1000000 | 1000/100000 | 10000/10000
----|------|----|---|----|----
Dataframe | 15149  | 7441 | 2118 | 224 | 21
RDD from Dataframe | 4992  | 4440 | 2328 | 320 | 33
raw RDD | 53931  | 20683 | 3966 | 528 | 53

Author: WeichenXu <[email protected]>

Closes apache#18798 from WeichenXu123/SPARK-19634-dataframe-summarizer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.