Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

1, reorg the fit method in LR to several blocks (createModel, createBounds, createOptimizer, createInitCoefWithInterceptMatrix);
2, add new param blockSize;
3, if blockSize==1, keep original behavior, code path trainOnRows;
4, if blockSize>1, standardize and stack input vectors to blocks (like ALS/MLP), code path trainOnBlocks

Why are the changes needed?

On dense dataset epsilon_normalized.t:
1, reduce RAM to persist traing dataset; (save about 40% RAM)
2, use Level-2 BLAS routines; (4x ~ 5x faster)

Does this PR introduce any user-facing change?

Yes, a new param is added

How was this patch tested?

existing and added testsuites

nit

nit

nit

nit

nit

nit

nit

nit

nit

nit

remove some transient lazy variables

nit

py

py
@zhengruifeng
Copy link
Contributor Author

performace test on epsilon_normalized.t

code:

import org.apache.spark.ml.classification._
import org.apache.spark.storage.StorageLevel


val df = spark.read.option("numFeatures", "2000").format("libsvm").load("/data1/Datasets/epsilon/epsilon_normalized.t").withColumn("label", (col("label")+1)/2)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count

val lr = new LogisticRegression().setBlockSize(1).setMaxIter(10)
lr.fit(df)


val results = Seq(1, 4, 16, 64, 256, 1024, 4096).map { size => val start = System.currentTimeMillis; val model = lr.setBlockSize(size).fit(df); val end = System.currentTimeMillis; (size, model.coefficients, end - start) }

results:

scala> results.map(_._3)
res3: Seq[Long] = List(31076, 6771, 6732, 7590, 7186, 7094, 7276)

scala> results.map(_._2).foreach(coef => println(coef.toString.take(100)))
[2.1557250220880024,-0.22767392418436572,4.569220246330072,0.04739667339597046,0.14605181933865558,-
[2.1557250220880064,-0.22767392418436597,4.56922024633007,0.04739667339596951,0.14605181933865646,-0
[2.1557250220880064,-0.2276739241843657,4.569220246330077,0.04739667339597028,0.14605181933865646,-0
[2.155725022088007,-0.2276739241843664,4.569220246330073,0.047396673395969764,0.14605181933865688,-0
[2.1557250220880038,-0.22767392418436605,4.569220246330073,0.04739667339597022,0.14605181933865458,-
[2.1557250220880033,-0.22767392418436683,4.569220246330072,0.047396673395970035,0.14605181933865727,
[2.1557250220880033,-0.22767392418436613,4.56922024633007,0.0473966733959703,0.1460518193386559,-0.0

blockSize==1
lr_dense_1

blockSize==256
lr_dense_256

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented May 6, 2020

performace test on sparse dataset: the first 10,000 instances of webspam_wc_normalized_trigram

code:

val df = spark.read.option("numFeatures", "8289919").format("libsvm").load("/data1/Datasets/webspam/webspam_wc_normalized_trigram.svm.10k").withColumn("label", (col("label")+1)/2)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count

val lr = new LogisticRegression().setBlockSize(1).setMaxIter(10)
lr.fit(df)

val results = Seq(1, 4, 16, 64, 256, 1024, 4096).map { size => val start = System.currentTimeMillis; val model = lr.setBlockSize(size).fit(df); val end = System.currentTimeMillis; (size, model.coefficients, end - start) }

results:

scala> results.map(_._3)
res17: Seq[Long] = List(33948, 425923, 129811, 56288, 47587, 42816, 39809)


scala> results.map(_._2).foreach(coef => println(coef.toString.take(100)))
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168
(8289919,[549219,551719,592137,592138,592141,592154,592160,592162,592163,592164,592166,592167,592168

scala> results.map(_._2).foreach(coef => println(coef.toString.takeRight(100)))
87,-1188.1053920127556,335.5565308836645,-135.79302172669907,849.0515530033497,-27.040836637047736])
91,-1188.105392012755,335.55653088366444,-135.79302172669907,849.0515530033497,-27.040836637047736])
9,-1188.1053920127551,335.55653088366444,-135.79302172669904,849.0515530033495,-27.040836637047725])
94,-1188.1053920127556,335.55653088366444,-135.79302172669904,849.0515530033495,-27.04083663704773])
1,-1188.1053920127551,335.55653088366444,-135.79302172669904,849.0515530033493,-27.040836637047722])
5,-1188.1053920127556,335.55653088366444,-135.79302172669904,849.0515530033495,-27.040836637047736])
29,-1188.105392012756,335.55653088366444,-135.79302172669904,849.0515530033495,-27.040836637047736])

blockSize==1
lr_sparse_1

blockSize=16
lr_sparse_16

test with Master:

import org.apache.spark.ml.classification._
import org.apache.spark.storage.StorageLevel

val df = spark.read.option("numFeatures", "8289919").format("libsvm").load("/data1/Datasets/webspam/webspam_wc_normalized_trigram.svm.10k").withColumn("label", (col("label")+1)/2)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count

val lr = new LogisticRegression().setMaxIter(10)
lr.fit(df)

val start = System.currentTimeMillis; val model = lr.setMaxIter(10).fit(df); val end = System.currentTimeMillis; end - start



scala> val start = System.currentTimeMillis; val model = lr.setMaxIter(10).fit(df); val end = System.currentTimeMillis; end - start
start: Long = 1588735447883                                                     
model: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_99d29a0ecc13, numClasses=2, numFeatures=8289919
end: Long = 1588735483170
res3: Long = 35287

In this PR, when blockSize==1, the duration is 33948, so there will be no performance regression on sparse datasets.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122334 has finished for PR 28458 at commit 563cee9.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • instr.logWarning(s\"All labels belong to a single class and fitIntercept=false. It's a \" +

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122338 has finished for PR 28458 at commit 0577bb2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122345 has finished for PR 28458 at commit 0577bb2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122349 has finished for PR 28458 at commit 0577bb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122356 has finished for PR 28458 at commit 1aca7c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122357 has finished for PR 28458 at commit 8f6582a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented May 7, 2020

This PR is a update of #27374, it can avoid performance regression on sparse datasets by default (with blockSize=1).
On dense datasets like epsilon, with a reasonable blockSize (100~1000), it is much faster than existing impl.

@zhengruifeng
Copy link
Contributor Author

Merged to master

@HyukjinKwon
Copy link
Member

@zhengruifeng, this was reviewed by nobody while the codes are near 1k lines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants