Skip to content

Conversation

@mengxr
Copy link
Contributor

@mengxr mengxr commented Jun 19, 2014

Task results are sent either via akka directly or block manager indirectly, based on whether the size of the serialized task is smaller than spark.akka.frameSize. However, the result is actually sent back to the driver via the backend actor, which is initialized before receiving the SparkConf and hence it doesn't know the spark.akka.frameSize. That is the root cause of SPARK-1112.

A quick fix is using the min frame size to decide which route to go.

This PR also fixes SPARK-2156.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@ash211
Copy link
Contributor

ash211 commented Jun 19, 2014

Is there a use case for making the min frame size configurable with a parameter?

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@mengxr mengxr changed the title [SPARK-1112] use min akka frame size to decide how to send task results [SPARK-1112, 2156] use min akka frame size to decide how to send task results Jun 19, 2014
@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

@ash211 If there is a way to make the configuration delivered consistently to backend, we can use spark.akka.frameSize consistently. It is then not necessary to set the min frame size.

This is a quick fix for 0.9.2 and 1.0.1.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15887/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15888/

@ash211
Copy link
Contributor

ash211 commented Jun 19, 2014

I'm not sure the implication of it, but it sounds like it's probably ok to
hard code this value for the quick fix. Thanks!

On Wed, Jun 18, 2014 at 7:34 PM, Xiangrui Meng [email protected]
wrote:

@ash211 https://github.com/ash211 If there is a way to make the
configuration delivered consistently to backend, we can use
spark.akka.frameSize consistently. It is then not necessary to set the
min frame size.

This is a quick fix for 0.9.2 and 1.0.1.


Reply to this email directly or view it on GitHub
#1124 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200 KB may not enough , Its value should increase as the serialized DirectResult becomes larger .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with a double array of size very close to 10 * 1024 * 1024. The akka message overhead is about 30-60K. This PR doesn't fix the issues with receiving new tasks from the driver that are bigger than 10MB. @pwendell is working on it.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15891/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slightly disingenuous name/comment, it's really the min maximum frame size.... Can keep the name, but maybe add some clarification in the comment.

@aarondav
Copy link
Contributor

LGTM. Note that this fix is "safe" in that in the most conservative case, we'll just return the result through the block manager rather than Akka. This may be a little slower, but should be guaranteed to work.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15894/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15899/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 200K and should we change the similar code that does this for sending task closures (which also subtracts 1024)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We computed the difference between the size of the akka message and the size of serialized task result (~10M). The difference is smaller than 60K. I set 200K to be safe. Could you point me to the places where we use 1024?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I saw the line in CoarseGrainedSchedulerBackend. Should the overhead be bounded by a fixed size or proportional to the message size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: your last question, I have no idea...you definitely know more about this than I do at this point. Using the higher, 200K value seems like a safe alternative to what's currently there. It would be great to add this constant in AkkaUtils so we don't need to manually track this down if it changes again in the future.

@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@kayousterhout
Copy link
Contributor

So this can make performance worse for folks that set spark.akka.frameSize in the SparkConf that's on the executor to be higher, right, because this commit means we effectively ignore that parameter setting? Can we just figure out what the max frame size actually is for the backend actor system?

Also, this can lead to hung jobs if a user set the frame size to lower than the default of 10, right? @pwendell said you guys chose to ignore this case but it would be nice to add a comment explicitly stating this in the code in case it turns up as a bug later.

@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

For the first scenario, it won't make the performance worse because the system doesn't really work now for serialized task result of size between 10M and spark.akka.frameSize. But yes, the ideal solution is to get the conf and set the right frame size. Maybe we can first request the conf from the driver, and then create a new ActorSystem on the backend with the correct frame size. This saves us from thinking about different deploy modes.

Any ActorSystem created using AkkaUtils.createActorSystem has at least 10M as the max frame size. It won't hang jobs.

Now I'm testing whether - 1024 is risky in SchedulerBackend.

@pwendell
Copy link
Contributor

@mengxr I think the scenario is where people have set SPARK_JAVA_OPTS in spark-env and copied this to all of the nodes. In that case, the configuration will be correctly observed by the actor system in CoarseGrainedExectorBackend. So if they upgrade to 1.0.1 they could see a performance regression. I actually think some power users probably fall into this category, so it might be good to address.

@pwendell
Copy link
Contributor

@mengxr this actually hung up while running tests:

[info] MapOutputTrackerSuite:
[info] - compressSize
[info] - decompressSize
[info] - master start and stop
[info] - master register shuffle and fetch
[info] - master register and unregister shuffle
[info] - master register shuffle and unregister map output and fetch
[info] - remote fetch
[info] - remote fetch below akka frame size

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15896/

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15902/

@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

I don't get it. As long as the actor systems are created from AkkaUtils.createActorSystem, the minimum value of the max frame size is 10M. All unit tests passed on Jenkins.

@kayousterhout
Copy link
Contributor

I see -- you're right about the min frame size. @pwendell was talking about the first case, where I think the config parameter is currently being correctly honored

@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

Just tested - 1024 in SchedulerBackend. The system did hang up when the task size is close to 10M - 1024 ...

scala> val random = new java.util.Random(0); val a = new Array[Byte](10 * 1024 * 1024 - 110000); random.nextBytes(a); sc.parallelize(0 until 1, 1).map(i => a).count()
14/06/19 00:33:59 INFO SparkContext: Starting job: count at <console>:14
14/06/19 00:33:59 INFO DAGScheduler: Got job 5 (count at <console>:14) with 1 output partitions (allowLocal=false)
14/06/19 00:33:59 INFO DAGScheduler: Final stage: Stage 5(count at <console>:14)
14/06/19 00:33:59 INFO DAGScheduler: Parents of final stage: List()
14/06/19 00:33:59 INFO DAGScheduler: Missing parents: List()
14/06/19 00:33:59 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[11] at map at <console>:14), which has no missing parents
14/06/19 00:34:00 INFO DAGScheduler: Submitting 1 missing tasks from Stage 5 (MappedRDD[11] at map at <console>:14)
14/06/19 00:34:00 INFO TaskSchedulerImpl: Adding task set 5.0 with 1 tasks
14/06/19 00:34:00 INFO TaskSetManager: Starting task 5.0:0 as TID 5 on executor 0: xm.att.net (PROCESS_LOCAL)
14/06/19 00:34:00 INFO TaskSetManager: Serialized task 5.0:0 as 10431605 bytes in 214 ms
task size: 10482813

@mengxr
Copy link
Contributor Author

mengxr commented Jun 19, 2014

@pwendell @kayousterhout I put an alternative solution in #1132 . Please let me know which do you prefer.

@mengxr
Copy link
Contributor Author

mengxr commented Jun 22, 2014

Closing this in favor of #1132.

@mengxr mengxr closed this Jun 22, 2014
asfgit pushed a commit that referenced this pull request Jun 25, 2014
This is an alternative solution to #1124 . Before launching the executor backend, we first fetch driver's spark properties and use it to overwrite executor's spark properties. This should be better than #1124.

@pwendell Are there spark properties that might be different on the driver and on the executors?

Author: Xiangrui Meng <[email protected]>

Closes #1132 from mengxr/akka-bootstrap and squashes the following commits:

77ff32d [Xiangrui Meng] organize imports
68e1dfb [Xiangrui Meng] use timeout from AkkaUtils; remove props from RegisteredExecutor
46d332d [Xiangrui Meng] fix a test
7947c18 [Xiangrui Meng] increase slack size for akka
4ab696a [Xiangrui Meng] bootstrap to retrieve driver spark conf
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This is an alternative solution to apache#1124 . Before launching the executor backend, we first fetch driver's spark properties and use it to overwrite executor's spark properties. This should be better than apache#1124.

@pwendell Are there spark properties that might be different on the driver and on the executors?

Author: Xiangrui Meng <[email protected]>

Closes apache#1132 from mengxr/akka-bootstrap and squashes the following commits:

77ff32d [Xiangrui Meng] organize imports
68e1dfb [Xiangrui Meng] use timeout from AkkaUtils; remove props from RegisteredExecutor
46d332d [Xiangrui Meng] fix a test
7947c18 [Xiangrui Meng] increase slack size for akka
4ab696a [Xiangrui Meng] bootstrap to retrieve driver spark conf
ahirreddy pushed a commit to ahirreddy/spark that referenced this pull request Jul 1, 2014
This is an alternative solution to apache#1124 . Before launching the executor backend, we first fetch driver's spark properties and use it to overwrite executor's spark properties. This should be better than apache#1124.

@pwendell Are there spark properties that might be different on the driver and on the executors?

Author: Xiangrui Meng <[email protected]>

Closes apache#1132 from mengxr/akka-bootstrap and squashes the following commits:

77ff32d [Xiangrui Meng] organize imports
68e1dfb [Xiangrui Meng] use timeout from AkkaUtils; remove props from RegisteredExecutor
46d332d [Xiangrui Meng] fix a test
7947c18 [Xiangrui Meng] increase slack size for akka
4ab696a [Xiangrui Meng] bootstrap to retrieve driver spark conf
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This is an alternative solution to apache#1124 . Before launching the executor backend, we first fetch driver's spark properties and use it to overwrite executor's spark properties. This should be better than apache#1124.

@pwendell Are there spark properties that might be different on the driver and on the executors?

Author: Xiangrui Meng <[email protected]>

Closes apache#1132 from mengxr/akka-bootstrap and squashes the following commits:

77ff32d [Xiangrui Meng] organize imports
68e1dfb [Xiangrui Meng] use timeout from AkkaUtils; remove props from RegisteredExecutor
46d332d [Xiangrui Meng] fix a test
7947c18 [Xiangrui Meng] increase slack size for akka
4ab696a [Xiangrui Meng] bootstrap to retrieve driver spark conf
wangyum pushed a commit that referenced this pull request May 26, 2023
* [CARMEL-6345] Support backup table command

* repair

* grammer

* comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants