Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Apr 26, 2020

What changes were proposed in this pull request?

1, add new param blockSize;
2, add a new class InstanceBlock;
3, if blockSize==1, keep original behavior; if blockSize>1, stack input vectors to blocks (like ALS/MLP);
4, if blockSize>1, standardize the input outside of optimization procedure;

Why are the changes needed?

1, reduce RAM to persist traing dataset; (save about 40% RAM)
2, use Level-2 BLAS routines; (4x ~ 5x faster on dataset epsilon)

Does this PR introduce any user-facing change?

Yes, a new param is added

How was this patch tested?

existing and added testsuites

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Apr 26, 2020

dataset: epsilon_normalized.t, numInstances=100,000, numFeatures=2,000

testCode:

import org.apache.spark.ml.classification._
import org.apache.spark.storage.StorageLevel


val df = spark.read.option("numFeatures", "2000").format("libsvm").load("/data1/Datasets/epsilon/epsilon_normalized.t").withColumn("label", (col("label")+1)/2)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count

val svc = new LinearSVC().setBlockSize(1).setMaxIter(10)
svc.fit(df)


val results = Seq(1, 4, 16, 64, 256, 1024, 4096).map { size => val start = System.currentTimeMillis; val model = svc.setBlockSize(size).fit(df); val end = System.currentTimeMillis; (size, model.coefficients, end - start) }

results (coefficients and durations)

scala> results.map(_._2).foreach{vec => println(vec.toString.take(150))}
[0.7494819225920858,-0.08640022531296566,2.3231273313690615,0.02563502913888478,0.041697717260290056,-0.008274929181709273,1.7654515767910142,0.073740
[0.7494819225920862,-0.08640022531296565,2.323127331369061,0.025635029138884782,0.041697717260290056,-0.008274929181709285,1.7654515767910144,0.073740
[0.7494819225920858,-0.08640022531296568,2.32312733136906,0.025635029138884782,0.04169771726029004,-0.008274929181709273,1.7654515767910144,0.07374022
[0.7494819225920852,-0.08640022531296565,2.3231273313690615,0.025635029138884782,0.04169771726029004,-0.00827492918170927,1.7654515767910142,0.0737402
[0.7494819225920866,-0.08640022531296565,2.3231273313690615,0.025635029138884782,0.041697717260290056,-0.008274929181709282,1.7654515767910144,0.07374
[0.7494819225920862,-0.08640022531296565,2.3231273313690615,0.025635029138884866,0.04169771726029005,-0.008274929181709245,1.7654515767910144,0.073740
[0.7494819225920862,-0.08640022531296568,2.323127331369061,0.025635029138884782,0.041697717260290056,-0.00827492918170928,1.7654515767910144,0.0737402

scala> results.map(_._3)
res13: Seq[Long] = List(43328, 8726, 8413, 9151, 9029, 8630, 9212)

speed:
blockSize=1:
svc_dur_1

blockSize=1024
svc_dur_1024

RAM:
blockSize=1
block_svc_1

blockSize=1024
block_svc_1024

Native BLAS is NOT used in this test

@zhengruifeng
Copy link
Contributor Author

friendly ping @srowen @WeichenXu123
Using high-level BLAS on dense datasets makes SVC much faster than existing impl, even without NativeBLAS.

To avoid performance regression on sparse datasets, switch to original impl by setting BlockSize=1

instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is up to the end user to choose whether high-level blas is used and which BLAS lib is used.
Here computes the sparsity of dataset, if input it too sparse, log a warning.

@zhengruifeng
Copy link
Contributor Author

The main part of this PR is similar to #27360,
while this PR will choose the original impl if blockSize=1

@zhengruifeng
Copy link
Contributor Author

The speedup is more significiant than that in #27360,
I think that is because: dataset epsilon has 2,000 features while a9a only has 123 features; dataset epsilon is more suitable for level-2 BLAS


block.matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directly update the first numFeatures elements in localGradientSumArray, avoiding creating a temp vector and then adding it to localGradientSumArray

if (fitIntercept) localGradientSumArray(numFeatures) += vec.values.sum

case sm: SparseMatrix if fitIntercept =>
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No avaiable method to update the first numFeatures, so need a temp output vector linearGradSumVec

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121838 has finished for PR 28349 at commit 1a77719.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121841 has finished for PR 28349 at commit 9e10b42.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121842 has finished for PR 28349 at commit d7598d6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2020

Test build #121845 has finished for PR 28349 at commit c59506d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

I also test on sparse dataset:

import org.apache.spark.ml.classification._
import org.apache.spark.storage.StorageLevel

val df = spark.read.option("numFeatures", "8289919").format("libsvm").load("/data1/Datasets/webspam/webspam_wc_normalized_trigram.svm.10k").withColumn("label", (col("label")+1)/2)

val svc = new LinearSVC().setMaxIter(10)
svc.fit(df)

val start = System.currentTimeMillis; val model1 = svc.setMaxIter(30).fit(df); val end = System.currentTimeMillis; end - start

results:
this PR:

scala> val start = System.currentTimeMillis; val model1 = svc.setMaxIter(30).fit(df); val end = System.currentTimeMillis; end - start
start: Long = 1587957534286                                                     
model1: org.apache.spark.ml.classification.LinearSVCModel = LinearSVCModel: uid=linearsvc_2fcd0abbb2d7, numClasses=2, numFeatures=8289919
end: Long = 1587957684508
res1: Long = 150222

Master:

scala> val start = System.currentTimeMillis; val model1 = svc.setMaxIter(30).fit(df); val end = System.currentTimeMillis; end - start
start: Long = 1587957959670                                                     
model1: org.apache.spark.ml.classification.LinearSVCModel = LinearSVCModel: uid=linearsvc_269e4f373d2c, numClasses=2, numFeatures=8289919
end: Long = 1587958111562
res1: Long = 151892

If we keep blockSIze=1, then there is no performance regression on sparse dataset.

@SparkQA
Copy link

SparkQA commented Apr 27, 2020

Test build #121896 has finished for PR 28349 at commit 9275258.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121950 has finished for PR 28349 at commit 0772834.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2020

Test build #121976 has finished for PR 28349 at commit a97a8fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Apr 29, 2020

I will merge this PR this week if nobody object.
Different from the previous one, this PR will not cause performace regression on sparse datasets by default (since default blockSize=1, and the original impl is used).
For expert users, they can tune blockSize for better performance.

init

init

init

init

init

init

init
update version
nit

nit
nit
@SparkQA
Copy link

SparkQA commented Apr 29, 2020

Test build #122058 has finished for PR 28349 at commit 58c0a1e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great to have @mengxr comment here. I think the issue last time was indeed sparse performance. If the default should not produce any slowdown on sparse, I think it's reasonable, but we need to tell the user about this tradeoff in the doc above.

I'm a little concerned about the extra complexity, but the speedup is promising.

setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might provide a little more comment about what this does. Increasing it increases performance, but at the risk of what, slowing down on sparse input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice of size needs tuning, it depends on dataset sparsity and numFeatures, Increasing it may not always increases performance.

if ($(standardization)) None else Some(getFeaturesStd)))
} else None

def regParamL1Fun = (index: Int) => 0D
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total nit, but write 0.0 ?

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122101 has finished for PR 28349 at commit 4ed1227.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122103 has finished for PR 28349 at commit e02a86e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2020

Test build #122266 has finished for PR 28349 at commit e8abb4b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

we need to tell the user about this tradeoff in the doc above.

@srowen I think there maybe other implementations (LoR/LiR/KMeans/GMM/...) that will support blockifying, so I perfer to add a description in the ml-guide.md in the future.

For LinearSVC, I added some doc in the setter of BlockSize

@zhengruifeng
Copy link
Contributor Author

Merged to master

@zhengruifeng zhengruifeng deleted the blockify_svc_II branch May 6, 2020 02:07
@HyukjinKwon
Copy link
Member

@zhengruifeng, let's avoid to merge without an approval, in particular, when the changes are pretty big. From a cursory look, it's even pretty invasive. This is not a great pattern to merge PRs without approvals when we should call more reviews and approvals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants