Skip to content

Conversation

@yinxusen
Copy link
Contributor

New JIRA issue MLLIB-28 with this pull request bring a new implementation of GradientDescent named GradientDescentWithLocalUpdate. The GradientDescentWithLocalUpdate can outperform the original GradientDescent by about 1x ~ 4x without sacrificing accuracy, and can be easily adopted by most classification and regression algorithms in MLlib.

Parallelism of many ML algorithms are limited by the sequential updating process of optimization algorithms they use. However, by carefully breaking the sequential chain, the updating process can be parallelized. In theGradientDescentWithLocalUpdate , we split the iteration loop into multiple supersteps. Within each superstep, an inner loop that runs a local optimization process is introduced into each partition. During the local optimization, only local data points in the partition are involved. Since different partitions are processed in parallel, the local optimization process is natually parallelized. Then, at the end of each superstep, all the gradients and loss histories computed from each partition are collected and merged in a bulk synchronous manner.

Detailed experiments and results in the original pull request and comments.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13229/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation error, should be 2 spaces.

@srowen
Copy link
Member

srowen commented Mar 18, 2014

Broad question: can this simply replace the existing implementation, if it's better? I'd suggest it is important to not let a bunch of different implementations proliferate, but unify them.

@yinxusen
Copy link
Contributor Author

In fact, if we set the numInnerIteration = 1, which is the default setting, then the GradientDescentWithLocalUpdate is identical to GradientDescent. However, I think it is better if we have opportunity to add new updater easily. End-user may have the requirement, for adding new customized updater.

@liancheng
Copy link
Contributor

Left some comments on minor issues like formatting. LGTM otherwise.

@srowen According to previous experiments, this implementation is indeed better than the original one, +1 for the replacement.

@liancheng
Copy link
Contributor

@srowen Forgot to mention, @etrain's comment should be one of the reasons why this PR doesn't try to replace the original one. BTW, basically I'm not an ML guy, so please ignore me if I'm saying rubbish :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with how duplicate is implemented. Scala doc says "The implementation may allocate temporary storage for elements iterated by one iterator but not yet by the other." Is there a risk of running out of memory here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I look into duplicate method just now. It uses a scala.collection.mutuable.Queue to mimic an iterator, and the elements iterated by one iterator but not yet by the other is stored there. I am shocked by that...

I have no idea of the memory cost by the Queue, but it seems the only way to duplicate an iterator. We have already tested before that the method is really fast than iterator.toArray. @liancheng Do you know about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using Iterator.duplicate to iterate the dataset multiple times without calling Iterator.toArray. According to implementation of duplicate, it does consume more memory and generates more temporary objects. I didn't notice that before. But according to previous experiments, duplicate is much more GC-friendly than toArray. I think the reason is that the underlying implementation of the mutable.Queue used in duplicate is actually a mutable.LinkedList, which doesn't require large amount of continuous memory, and thus may trigger full-GC less frequently.

If my guess is right, instead of using duplicate maybe we can simply call Iterator.toList to reduce full-GC frequency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yinxusen I think this approach will certainly run OOM if data is too big to fit into memory. You can set a small executor memory and test some data without caching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mengxr, I absolutely agree with you. I am trying another way now, and will have a test result tomorrow.

@yinxusen
Copy link
Contributor Author

I use the new method to enlarge local update. Test on SVM and LogisticRegression looks as good as the first version, without the worry of OOM. This method can get better result in shorter time, especially when the dataset is too large to cache in memory.

I think this method is much more like the method provided here in section 3. I'm not mentioned that it is a better way, but the original GradientDescent is somewhat like an elephant pulling a small carriage.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13322/

@yinxusen
Copy link
Contributor Author

I have test the original/1-version/2-version LR and SVM, here is the result:

(Note that original version runs 100 iterations, while the other two run 10 iterations with 10 local iterations.)

Type Time Last 10 losses
original LR 89.348 0.638 - 0.636
1-version LR 13.094 1.769 - 0.636
2-version LR 10.747 1.618 - 0.631
original SVM 88.708 0.947 - 0.947
1-version SVM 13.062 2.127 - 0.694
2-version SVM 10.829 1.943 - 0.691

There are 3 updaters : L1-updater, L2-updater, and simple updater, and 3 gradients: Logistic, Square and Hinge. SVM uses Hinge+L2, LR uses Logistic+simple, Lasso uses Square+L1.

But I encounter some difficulties in Lasso, I am still trying to fix them.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@yinxusen
Copy link
Contributor Author

I rewrite the 2 versions of GradientDescent with Vector instead of Array. Lasso is easy to test now thanks for @mengxr 's refactoring of code.

I run the test on a single node, in local mode. Note that original version runs 100 iterations, while the other two run 10 iterations with 10 local iterations.

latest update:

Type Time Last 10 losses
original LR 346 0.6444 - 0.6430
1-version LR 45 0.7082-0.6773
2-version LR 37 0.7070-0.6817
original SVM 338 0.9468 - 0.9468
1-version SVM 46 0.7861 - 0.7803
2-version SVM 34 0.7875 - 0.7829
original Lasso 320 0.6063 - 0.6063
1-version Lasso 39 0.6131 - 0.2062
2-version Lasso 32 0.6062 - 0.2104

1-version is not good due to the reuse of Iterator, which inherently store all elements in a queue and will cause OOM if the data entry in a partition is large enough. 2-version is better, but due to the tiny-batch property, 2-version GradientDescent's convergence ability is slightly lower than the 1-version. There is a trade-off between hardware efficiency and statistical efficiency.

I port my code into an independent git repo so as to do experiments more easily, I'll move them back here recently.

@yinxusen
Copy link
Contributor Author

I'd like to close the PR, for the offline discussion with @mengxr . The code will be stay in my github repo, for those who still interested in it.

@yinxusen yinxusen closed this Apr 21, 2014
jhartlaub referenced this pull request in jhartlaub/spark May 27, 2014
SIMR Backend Scheduler will now write Spark UI URL to HDFS, which is to ...

...be retrieved by SIMR clients

(cherry picked from commit 39af914)
Signed-off-by: Reynold Xin <[email protected]>
gmoehler pushed a commit to gmoehler/spark that referenced this pull request Jan 23, 2017
…ewhere?

Test case failure is:
- SPARK-19311: UDFs disregard UDT type hierarchy *** FAILED ***
  org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, tree:
Project [UDF(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(cast(UDF(41) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType) as exampleBaseType)) AS UDF(UDF(41))apache#166]
+- SubqueryAlias tmp_table
   +- Project [_1#157 AS id#160, _2#158 AS saying#161]
      +- LocalRelation [_1#157, _2#158]
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:105)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
  at org.apache.spark.sql.test.SQLTestUtils$$anonfun$sql$1.apply(SQLTestUtils.scala:61)
ericl pushed a commit to ericl/spark that referenced this pull request Jan 23, 2017
…Kafka source

## What changes were proposed in this pull request?

This PR adds the configuration `numPartitions` to the StructuredStreaming Kafka Source. Setting this value to a value higher than the number of `TopicPartitions` that you're going to consume will allow Spark to have multiple tasks reading from the same `TopicPartition` allowing users to handle skewed partitions.

While the number of `TopicPartitions` could be dynamic from batch to batch, e.g. you may delete/create topics, in ETL use cases where you generally have a set of static number of TopicPartitions, this configuration has been very useful.

If the `TopicPartitions` are dynamic, then we will always have a parallelism of `max(topicPartitions.length, numPartitions)`.

## How was this patch tested?

Unit tests. I used this on production data and it certainly helped in handling peak loads and skewed partitions.

Author: Burak Yavuz <[email protected]>

Closes apache#166 from brkyvz/kafka-par-split.
ash211 referenced this pull request in palantir/spark Mar 3, 2017
lins05 pushed a commit to lins05/spark that referenced this pull request Apr 23, 2017
erikerlandson pushed a commit to erikerlandson/spark that referenced this pull request Jul 28, 2017
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 12, 2017
Igosuki pushed a commit to Adikteev/spark that referenced this pull request Jul 31, 2018
…pache#166)

* Enabled the failover timeout test, since it is upstream and in custom master. Re-enabled HDFS tests, because HDFS was recently released with a fix for HDFS-461.

* Increased hdfs deployment timeout (matching the timeout in dcos-commons integration tests).
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
microbearz pushed a commit to microbearz/spark that referenced this pull request Dec 15, 2020
* KE-16978 revert cast filter push down

* release r46 (apache#154)

Co-authored-by: chenliang.lu <[email protected]>
Co-authored-by: Mingming Ge <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants