Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented May 1, 2017

What changes were proposed in this pull request?

Current ML's Bucketizer can only bin a column of continuous features. If a dataset has thousands of of continuous columns needed to bin, we will result in thousands of ML stages. It is inefficient regarding query planning and execution.

We should have a type of bucketizer that can bin a lot of columns all at once. It would need to accept an list of arrays of split points to correspond to the columns to bin, but it might make things more efficient by replacing thousands of stages with just one.

This current approach in this patch is to add a new MultipleBucketizerInterface for this purpose. Bucketizer now extends this new interface.

Performance

Benchmarking using the test dataset provided in JIRA SPARK-20392 (blockbuster.csv).

The ML pipeline includes 2 StringIndexers and 1 MultipleBucketizer or 137 Bucketizers to bin 137 input columns with the same splits. Then count the time to transform the dataset.

MultipleBucketizer: 3352 ms
Bucketizer: 51512 ms

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@SparkQA
Copy link

SparkQA commented May 1, 2017

Test build #76349 has finished for PR 17819 at commit e8f5d89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • final class MultipleBucketizer @Since(\"2.3.0\") (@Since(\"2.3.0\") override val uid: String)
  • class DoubleArrayArrayParam(

@viirya
Copy link
Member Author

viirya commented May 1, 2017

cc @MLnick @jkbradley for review. Thanks.

@viirya viirya changed the title [SPARK-20542][ML][SQL][WIP] Add a Bucketizer that can bin multiple columns [SPARK-20542][ML][SQL] Add a Bucketizer that can bin multiple columns May 1, 2017
*
* @group untypedrel
* @since 2.3.0
*/
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering shall I make an individual PR for this SQL change. cc @cloud-fan

Copy link
Contributor

@cloud-fan cloud-fan May 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we make it private[spark]? I'm not sure if this API is good enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

@MLnick
Copy link
Contributor

MLnick commented May 2, 2017

@viirya can you post some performance comparisons for this?

@viirya
Copy link
Member Author

viirya commented May 2, 2017

@MLnick Ok. Let me prepare the comparisons.

@SparkQA
Copy link

SparkQA commented May 2, 2017

Test build #76379 has finished for PR 17819 at commit 38dce8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2017

Test build #76406 has finished for PR 17819 at commit 6ff9c79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented May 4, 2017

@MLnick I've done a benchmark using the test dataset provided in JIRA SPARK-20392 (blockbuster.csv).

The ML pipeline includes 2 StringIndexers and 1 MultipleBucketizer or 137 Bucketizers to bin 137 input columns with the same splits.

MultipleBucketizer: 3352 ms
Bucketizer: 51512 ms

@MLnick
Copy link
Contributor

MLnick commented May 4, 2017

Thanks. Result does look good.

So the improvement is really coming from the new withColumns that avoids a bunch of projections in the plan in favor of one (more or less)? So the same approach can benefit any transformer that could operate on multiple cols (at least at transform time)?

@viirya
Copy link
Member Author

viirya commented May 5, 2017

The bunch of projections will be collapsed in optimization. So it doesn't affect query execution. However, every withColumn call creates new DataFrame along with a projection on previous logical plan. It is costly by creating new query execution, analyzing logical plan, creating encoder, etc. So the improvement is coming from saving the cost by doing this one time with withColumns, instead of multiple withColumn.

It can benefit other transformers that could work on multiple cols. I even have an idea to revamp the interface of Transformer. Because the transformation in Transformer is actually ending with a withColumn call to add/replace column. They are actually transforming columns in the dataset. We don't need to re-create a DataFrame with each transformation.

But the performance difference is obvious only when the number of transformation stages is large enough like the example of many Bucketizers. So it may not worth doing that. Just a thought.

@viirya
Copy link
Member Author

viirya commented May 5, 2017

Note: since in Transformer, there might be other manipulation to the dataset like dropping NaN values. The idea above won't work under that.

@barrybecker4
Copy link

barrybecker4 commented May 9, 2017

I don't see support for "withColumns" in spark 2.1.1 or on the spark tip. Which version or branch does it first appear? This work seems related to https://issues.apache.org/jira/browse/SPARK-12225.

@viirya
Copy link
Member Author

viirya commented May 11, 2017

@barrybecker4 withColumns API is first introduced in this PR. So you won't see it in Spark 2.1.1 or current codebase. Thanks for letting me know SPARK-12225. Yes, it is related.

@viirya
Copy link
Member Author

viirya commented May 18, 2017

ping @MLnick Do you have more comments on this? Thanks.

@MLnick
Copy link
Contributor

MLnick commented May 18, 2017

I will try to take a look soon. My main concern is whether we should really have a new class - it starts to make things really messy if we introduce Multi versions of everything (e.g. we may want to add multi col support to StringIndexer, OneHotEncoder among others).

@viirya
Copy link
Member Author

viirya commented May 18, 2017

@MLnick That's right. I also have concern about this. However, to keep the original single-column Bucketizer and multiple-column Bucketizer in one class seems also producing a messy code.

I'd rethink it and see if there is a good way to incorporate both.

@viirya viirya changed the title [SPARK-20542][ML][SQL] Add a Bucketizer that can bin multiple columns [SPARK-20542][ML][SQL] Add an API to Bucketizer that can bin multiple columns Jun 12, 2017
@viirya
Copy link
Member Author

viirya commented Jun 12, 2017

@MLnick I've updated the previous solution. The new API is implemented in an interface which Bucketizer extends now. So you can still use Bucketizer class. Depends on what parameters you set, it goes for single column or multiple bucketizing.

Please take a look if you have time. Thanks.

@SparkQA
Copy link

SparkQA commented Jun 12, 2017

Test build #77924 has finished for PR 17819 at commit 4301314.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2017

Test build #77935 has finished for PR 17819 at commit 08cbfac.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 12, 2017

Test build #77937 has finished for PR 17819 at commit 8386d1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 11, 2017

ping @MLnick Can you have time to help review this recently? Thanks.

@MLnick
Copy link
Contributor

MLnick commented Oct 10, 2017 via email

Copy link
Contributor

@MLnick MLnick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ML part looks pretty good. I left a few fairly minor comments.

The SQL part looks ok also - though will wait for others e.g. @gatorsmile to take a look too.

/**
* `Bucketizer` maps a column of continuous features to a column of feature buckets.
* `Bucketizer` maps a column of continuous features to a column of feature buckets. Since 2.3.0,
* `Bucketizer` can also map multiple columns at once. Whether it goes to map a column or multiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps:

Since 2.3.0, Bucketizer can map multiple columns at once by setting the inputCols parameter. Note that when both the inputCol and inputCols parameters are set, a log warning will be printed and only inputCol will take effect, while inputCols will be ignored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Looks better.

* @group param
*/
@Since("2.3.0")
final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we making this final (and not others)? (also the getOutputCols?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess similarly to shared params? I think it makes sense to add a shared param since this, Imputer and others will use it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think the final is copied from previous multiple bucketizer trait. I'll remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create HasOutputCols.

val newCols = inputColumns.zipWithIndex.map { case (inputCol, idx) =>
bucketizers(idx)(filteredDataset(inputCol).cast(DoubleType))
}
val newFields = outputColumns.zipWithIndex.map { case (outputCol, idx) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we not done this already in transformSchema? Can we just re-use the result of that?

import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off$

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Scala example?

Copy link
Member Author

@viirya viirya Oct 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a Scala example.

val data = (0 until validData1.length).map { idx =>
(validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx))
}
val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toSeq not required here?

val data = (0 until validData1.length).map { idx =>
(validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx))
}
val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, toSeq unnecessary.

}
}

test("multiple columns:: read/write") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: two : here

testDefaultReadWrite(t)
}

test("Bucketizer in a pipeline") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be overkill - but we would expect a Bucketizer with multi cols set to have precisely the same operation as multiple Bucketizer. Perhaps a test comparing them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added a test for it.

pl.transform(df).select("result1", "expected1", "result2", "expected2")
.collect().foreach {
case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
assert(r1 === e1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicated across a few test cases - perhaps we could factor it out into a utility method.

* `Bucketizer` maps a column of continuous features to a column of feature buckets. Since 2.3.0,
* `Bucketizer` can also map multiple columns at once. Whether it goes to map a column or multiple
* columns, it depends on which parameter of `inputCol` and `inputCols` is set. When both are set,
* a log warning will be printed and by default it chooses `inputCol`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also mention that splits is only used for single column and splitsArray for multi column

@SparkQA
Copy link

SparkQA commented Oct 10, 2017

Test build #82583 has finished for PR 17819 at commit 1889995.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 11, 2017

@MLnick Thanks for leaving the comments. I think I've addressed all of them. Please take a look if you are free. Thanks.

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82632 has finished for PR 17819 at commit bb19708.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 19, 2017

@MLnick Any more comments or thoughts on this I need to address?

@AFractalThought
Copy link

Does this extension exist for QuantileDiscretizer as well?

@huaxingao
Copy link
Contributor

@AFractalThought @viirya
I have made changes for QuantileDiscretizer based on this PR. Once this PR is merged, I will open a jira to submit the PR for QuantileDiscretizer.

@viirya
Copy link
Member Author

viirya commented Oct 29, 2017

@MLnick Is this ready to go?

@MLnick
Copy link
Contributor

MLnick commented Oct 30, 2017

I've created https://issues.apache.org/jira/browse/SPARK-22397 to track the changes in QuantileDiscretizer. The PR can be submitted once we finalize this one.

@viirya
Copy link
Member Author

viirya commented Oct 30, 2017

Thanks @MLnick

@AFractalThought
Copy link

Thanks @huaxingao @MLnick @viirya this will be super helpful

@MLnick
Copy link
Contributor

MLnick commented Nov 6, 2017

@viirya could you resolve conflicts?

@viirya
Copy link
Member Author

viirya commented Nov 6, 2017

@MLnick Conflicts resolved. Thanks.

@SparkQA
Copy link

SparkQA commented Nov 7, 2017

Test build #83519 has finished for PR 17819 at commit a970723.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Nov 8, 2017

@MarcKaminski by the way you mentioned a vector bucketizer. I think in principal that might be useful. I'm not sure if it would make sense to add vector type support to the existing Bucketizer or if it would need to be separate.

Perhaps you can post a link to code / design doc on this JIRA for the vector type version?

Copy link
Contributor

@MLnick MLnick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Will leave open for the rest of the day in case any other reviewer wants a final look.

@viirya
Copy link
Member Author

viirya commented Nov 9, 2017

Thanks @MLnick

@viirya
Copy link
Member Author

viirya commented Nov 9, 2017

About vector bucketizer, seems it might work similarly as multi-col bucketizer. But some behaviors such as Bucketizer.SKIP_INVALID need to address.

@MLnick
Copy link
Contributor

MLnick commented Nov 9, 2017

Merged to master. Thanks @viirya and all the reviewers!

@asfgit asfgit closed this in 77f7453 Nov 9, 2017
Bucketizer.binarySearchForBuckets($(splits), feature, keepInvalid)
}.withName("bucketizer")
val seqOfSplits = if (isBucketizeMultipleColumns()) {
$(splitsArray).toSeq
Copy link
Contributor

@tengpeng tengpeng Nov 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am interested in the difference between .toSeq and Seq().

@leliang65
Copy link

Is there any python example for this api?

@viirya
Copy link
Member Author

viirya commented Apr 19, 2018

@leliang65 The PySpark support is not added yet. Please refer to #19892.

@viirya viirya deleted the SPARK-20542 branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.