Skip to content

Conversation

@dbtsai
Copy link
Member

@dbtsai dbtsai commented Apr 8, 2014

This PR uses Breeze's L-BFGS implement, and Breeze dependency has already been introduced by Xiangrui's sparse input format work in SPARK-1212. Nice work, @mengxr !

When use with regularized updater, we need compute the regVal and regGradient (the gradient of regularized part in the cost function), and in the currently updater design, we can compute those two values by the following way.

Let's review how updater works when returning newWeights given the input parameters.

w' = w - thisIterStepSize * (gradient + regGradient(w)) Note that regGradient is function of w!
If we set gradient = 0, thisIterStepSize = 1, then
regGradient(w) = w - w'

As a result, for regVal, it can be computed by

val regVal = updater.compute(
  weights,
  new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2

and for regGradient, it can be obtained by

  val regGradient = weights.sub(
    updater.compute(weights, new DoubleMatrix(initialWeights.length, 1), 1, 1, regParam)._1)

The PR includes the tests which compare the result with SGD with/without regularization.

We did a comparison between LBFGS and SGD, and often we saw 10x less
steps in LBFGS while the cost of per step is the same (just computing
the gradient).

The following is the paper by Prof. Ng at Stanford comparing different
optimizers including LBFGS and SGD. They use them in the context of
deep learning, but worth as reference.
http://cs.stanford.edu/~jngiam/papers/LeNgiamCoatesLahiriProchnowNg2011.pdf

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13872/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13873/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13877/

@mengxr
Copy link
Contributor

mengxr commented Apr 8, 2014

@dbtsai Did you compare L-BFGS with MLlib's implementation of GD on some real data sets?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better create a private class for the cost function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the optimizer with several real data, for example, small ones from UCI Machine Learning Repository, and some big data like mnist8m (although the property and stability of optimizer don't depend on the size of dataset), L-BFGS gives the same or better result compared with GD. For some dataset, GD will converge really slow after 40~50 iterations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cost function, I intend to do it in this way because in the code of cost function, I want to access and modify variables outside the cost function, for example, "i", "lossHistory", and if I create a private class for this, it will be extra effort to achieve this without changing breeze DiffFunction signature.

@dbtsai
Copy link
Member Author

dbtsai commented Apr 8, 2014

@mengxr As you suggested, I moved the costFun to private CostFun class.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13907/

@dbtsai dbtsai changed the title SPARK-1157: L-BFGS Optimizer based on Breeze's implementation. [SPARK-1157][MLlib] L-BFGS Optimizer based on Breeze's implementation. Apr 9, 2014
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala imports Array by default.

@dbtsai dbtsai reopened this Apr 15, 2014
@dbtsai
Copy link
Member Author

dbtsai commented Apr 15, 2014

Jenkins, retest this please.

@dbtsai
Copy link
Member Author

dbtsai commented Apr 15, 2014

Timeout for lastest jenkins run. It seems that CI is not stable now.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14126/

@mengxr
Copy link
Contributor

mengxr commented Apr 15, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14142/

@pwendell
Copy link
Contributor

Thanks - merged this!

@dbtsai dbtsai closed this Apr 15, 2014
@dbtsai dbtsai deleted the dbtsai-LBFGS branch April 15, 2014 18:45
@dbtsai dbtsai restored the dbtsai-LBFGS branch April 15, 2014 18:49
asfgit pushed a commit that referenced this pull request Apr 15, 2014
This PR uses Breeze's L-BFGS implement, and Breeze dependency has already been introduced by Xiangrui's sparse input format work in SPARK-1212. Nice work, @mengxr !

When use with regularized updater, we need compute the regVal and regGradient (the gradient of regularized part in the cost function), and in the currently updater design, we can compute those two values by the following way.

Let's review how updater works when returning newWeights given the input parameters.

w' = w - thisIterStepSize * (gradient + regGradient(w))  Note that regGradient is function of w!
If we set gradient = 0, thisIterStepSize = 1, then
regGradient(w) = w - w'

As a result, for regVal, it can be computed by

    val regVal = updater.compute(
      weights,
      new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
and for regGradient, it can be obtained by

      val regGradient = weights.sub(
        updater.compute(weights, new DoubleMatrix(initialWeights.length, 1), 1, 1, regParam)._1)

The PR includes the tests which compare the result with SGD with/without regularization.

We did a comparison between LBFGS and SGD, and often we saw 10x less
steps in LBFGS while the cost of per step is the same (just computing
the gradient).

The following is the paper by Prof. Ng at Stanford comparing different
optimizers including LBFGS and SGD. They use them in the context of
deep learning, but worth as reference.
http://cs.stanford.edu/~jngiam/papers/LeNgiamCoatesLahiriProchnowNg2011.pdf

Author: DB Tsai <[email protected]>

Closes #353 from dbtsai/dbtsai-LBFGS and squashes the following commits:

984b18e [DB Tsai] L-BFGS Optimizer based on Breeze's implementation. Also fixed indentation issue in GradientDescent optimizer.
(cherry picked from commit 6843d63)

Signed-off-by: Patrick Wendell <[email protected]>
asfgit pushed a commit that referenced this pull request Apr 15, 2014
This PR uses Breeze's L-BFGS implement, and Breeze dependency has already been introduced by Xiangrui's sparse input format work in SPARK-1212. Nice work, @mengxr !

When use with regularized updater, we need compute the regVal and regGradient (the gradient of regularized part in the cost function), and in the currently updater design, we can compute those two values by the following way.

Let's review how updater works when returning newWeights given the input parameters.

w' = w - thisIterStepSize * (gradient + regGradient(w))  Note that regGradient is function of w!
If we set gradient = 0, thisIterStepSize = 1, then
regGradient(w) = w - w'

As a result, for regVal, it can be computed by

    val regVal = updater.compute(
      weights,
      new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
and for regGradient, it can be obtained by

      val regGradient = weights.sub(
        updater.compute(weights, new DoubleMatrix(initialWeights.length, 1), 1, 1, regParam)._1)

The PR includes the tests which compare the result with SGD with/without regularization.

We did a comparison between LBFGS and SGD, and often we saw 10x less
steps in LBFGS while the cost of per step is the same (just computing
the gradient).

The following is the paper by Prof. Ng at Stanford comparing different
optimizers including LBFGS and SGD. They use them in the context of
deep learning, but worth as reference.
http://cs.stanford.edu/~jngiam/papers/LeNgiamCoatesLahiriProchnowNg2011.pdf

Author: DB Tsai <[email protected]>

Closes #353 from dbtsai/dbtsai-LBFGS and squashes the following commits:

984b18e [DB Tsai] L-BFGS Optimizer based on Breeze's implementation. Also fixed indentation issue in GradientDescent optimizer.
@dbtsai dbtsai deleted the dbtsai-LBFGS branch April 15, 2014 20:40
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This PR uses Breeze's L-BFGS implement, and Breeze dependency has already been introduced by Xiangrui's sparse input format work in SPARK-1212. Nice work, @mengxr !

When use with regularized updater, we need compute the regVal and regGradient (the gradient of regularized part in the cost function), and in the currently updater design, we can compute those two values by the following way.

Let's review how updater works when returning newWeights given the input parameters.

w' = w - thisIterStepSize * (gradient + regGradient(w))  Note that regGradient is function of w!
If we set gradient = 0, thisIterStepSize = 1, then
regGradient(w) = w - w'

As a result, for regVal, it can be computed by

    val regVal = updater.compute(
      weights,
      new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
and for regGradient, it can be obtained by

      val regGradient = weights.sub(
        updater.compute(weights, new DoubleMatrix(initialWeights.length, 1), 1, 1, regParam)._1)

The PR includes the tests which compare the result with SGD with/without regularization.

We did a comparison between LBFGS and SGD, and often we saw 10x less
steps in LBFGS while the cost of per step is the same (just computing
the gradient).

The following is the paper by Prof. Ng at Stanford comparing different
optimizers including LBFGS and SGD. They use them in the context of
deep learning, but worth as reference.
http://cs.stanford.edu/~jngiam/papers/LeNgiamCoatesLahiriProchnowNg2011.pdf

Author: DB Tsai <[email protected]>

Closes apache#353 from dbtsai/dbtsai-LBFGS and squashes the following commits:

984b18e [DB Tsai] L-BFGS Optimizer based on Breeze's implementation. Also fixed indentation issue in GradientDescent optimizer.
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 3, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
Enable SSL to test manageiq-providers-openstack-test-public-clouds
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
fishcus added a commit to fishcus/spark that referenced this pull request Nov 26, 2021
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…edExecutorBackend (apache#353)

* [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" apache#40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes apache#43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>

* fix

---------

Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Co-authored-by: Bo Xiong <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants