Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented May 7, 2020

What changes were proposed in this pull request?

1, add new param blockSize;
2, add a new class InstanceBlock;
3, if blockSize==1, keep original behavior; if blockSize>1, stack input vectors to blocks (like ALS/MLP);
4, if blockSize>1, standardize the input outside of optimization procedure;

Why are the changes needed?

it will obtain performance gain on dense datasets, such as epsilon
1, reduce RAM to persist traing dataset; (save about 40% RAM)
2, use Level-2 BLAS routines; (~10X speedup)

Does this PR introduce any user-facing change?

Yes, a new param is added

How was this patch tested?

existing and added testsuites

@SparkQA
Copy link

SparkQA commented May 7, 2020

Test build #122388 has finished for PR 28473 at commit b8d8e1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 7, 2020

Test build #122390 has finished for PR 28473 at commit c7e5f4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

testCode:


import org.apache.spark.ml.regression._
import org.apache.spark.storage.StorageLevel


val df = spark.read.option("numFeatures", "2000").format("libsvm").load("/data1/Datasets/epsilon/epsilon_normalized.t").withColumn("censor", (col("label")+1)/2).withColumn("label", (col("label")+2)/2)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count


val aft = new AFTSurvivalRegression().setMaxIter(50)
aft.fit(df)

val results = Seq(1, 4, 16, 64, 256, 1024, 4096).map { size => val start = System.currentTimeMillis; val model = aft.setBlockSize(size).fit(df); val end = System.currentTimeMillis; (size, model, end - start) }

results:

scala> results.foreach(t => println(t._2.coefficients.toString.take(100)))
[-0.0739321337153592,-0.020829491487167364,0.06661851987557675,0.019253370633371538,-0.0588055025651
[-0.07393213372315585,-0.020829491465817675,0.06661851987897843,0.019253370565929854,-0.058805502527
[-0.07393213383693777,-0.020829491161483995,0.06661851992574323,0.019253369608308712,-0.058805501990
[-0.07393213385166159,-0.020829491120415617,0.06661851993259112,0.019253369477997134,-0.058805501918
[-0.07393213382685696,-0.020829491187320876,0.06661851992205566,0.019253369688987693,-0.058805502036
[-0.07393213385098614,-0.02082949112233834,0.06661851993235628,0.019253369483972174,-0.0588055019215
[-0.07393213377460728,-0.020829491324180386,0.06661851990185824,0.0192533701179324,-0.05880550227761

scala> results.map(_._2.intercept)
res18: Seq[Double] = List(0.4605592348900757, 0.4605592349298284, 0.46055923549542066, 0.4605592355720131, 0.4605592354475875, 0.46055923556842815, 0.46055923519370545)

scala> results.map(_._2.scale)
res19: Seq[Double] = List(0.07575246987902046, 0.07575246988748957, 0.07575247000765407, 0.07575247002401692, 0.07575246999753335, 0.07575247002326578, 0.07575246994370846)

scala> results.map(_._3)
res20: Seq[Long] = List(137684, 12706, 12453, 16875, 13661, 13791, 14540)

When blockSize is set 256, it is 10X faster than existing impl.

blockSize=1
aft_1

blockSize=256
aft_256

nit

nit
nit
@SparkQA
Copy link

SparkQA commented May 8, 2020

Test build #122426 has finished for PR 28473 at commit eaa8981.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants