Skip to content

Conversation

@YanTangZhai
Copy link
Contributor

HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time.
For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we
want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't
need to wait much time. HadoopRDD object could get its partitons when it is instantiated.
We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_____].
(1) The app has only one job
(a)
The execution time of the job before optimization is [time1__][time2_][time3___][time4_____].
The execution time of the job after optimization is....[time1__][time3___][time2_][time4_____].
In summary, if the app has only one job, the total execution time is same before and after optimization.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_____],
job2 execution time is [time2__________][time3___][time4_____],
job3 execution time is................................[time2____][time3___][time4_____],
job4 execution time is................................[time2_____________][time3___][time4_____].
After optimization,
job1 execution time is [time3___][time2_][time4_____],
job2 execution time is [time3___][time2__][time4_____],
job3 execution time is................................[time3___][time2_][time4_____],
job4 execution time is................................[time3___][time2__][time4_____].
In summary, if the app has multiple jobs, average execution time after optimization is less than before.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24805 has finished for PR 3794 at commit 5601a8b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

This looks like a legitimate test failure.

@SparkQA
Copy link

SparkQA commented Dec 25, 2014

Test build #24810 has finished for PR 3794 at commit af5abda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

To reformat the PR description to make it a little easier to read:

HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated.

We could analyse and compare the execution time before and after optimization.

TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_____].

(1) The app has only one job
(a)

The execution time of the job before optimization is [time1__][time2_][time3___][time4_____].
The execution time of the job after optimization is....[time1__][time3___][time2_][time4_____].

In summary, if the app has only one job, the total execution time is same before and after optimization.
(2) The app has 4 jobs
(a) Before optimization,

job1 execution time is [time2_][time3___][time4_____],
job2 execution time is [time2__________][time3___][time4_____],
job3 execution time is................................[time2____][time3___][time4_____],
job4 execution time is................................[time2_____________][time3___][time4_____].

After optimization,

job1 execution time is [time3___][time2_][time4_____],
job2 execution time is [time3___][time2__][time4_____],
job3 execution time is................................[time3___][time2_][time4_____],
job4 execution time is................................[time3___][time2__][time4_____].

In summary, if the app has multiple jobs, average execution time after optimization is less than before.

@JoshRosen
Copy link
Contributor

To maybe summarize the motivation a bit more succinctly, it seems like the problem here is that the first call to rdd.partitions might be expensive and might occur inside the DAGScheduler event loop, blocking the entire scheduler. I guess this is an unfortunate side-effect of laziness: we might have expensive lazy initialization but it can be hard to reason about when/where it will occur, causing difficult-to-diagnose performance bottlenecks.

It seems like the fix in this patch is to force partitions to be eagerly-computed in the driver thread that defines the RDD. This seems like a good idea, but I have a few minor nits with the fix as it's currently implemented:

  • I understand that the motivation for this is HadoopRDD's expensive getPartitions method, but it seems like the problem is potentially more general. Is there any way to handle this RDD instead? I understand that we can't just make partitions into a val, but it looks like the @transient partitions_ logic is already there in RDD, so maybe we could just toss a self.partitions() call into the RDD constructor to force eager evaluation on the driver?
  • If there's some reason that we can't implement my proposal in RDD, then I think we can just add a call to self.partitions()at the end of HadoopRDD; this would eliminate the need for a bunch of the confusing variable names added here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This comment is kind of moot since I proposed a more general fix in a top-level comment, but I'll still post it anyways:)

I don't think that logging an exception at debug level then returning null is a good error-handling strategy; this is likely to cause a confusing NPE somewhere else with no obvious cause since most users won't have debug-level logging enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the fix in this patch is to force partitions to be eagerly-computed in the driver thread that defines the RDD. This seems like a good idea

How would this interact with the idea of @erikerlandson to defer partition computation?
#3079

@SparkQA
Copy link

SparkQA commented Dec 30, 2014

Test build #24892 timed out for PR 3794 at commit 6e95955 after a configured wait of 120m.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this now throw a NPE if we call partitions from a worker, since now this will return null after the RDD is serialized and deserialized? I guess maybe we never do that?

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25745 has finished for PR 3794 at commit b535a53.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@YanTangZhai
Copy link
Contributor Author

@JoshRosen Thanks for your comments. I've updates it. I directly use getParentStages which will call RDD's getPartitions before sending JobSubmitted event. Is it ok?

@JoshRosen
Copy link
Contributor

Good catch on the error-handling logic.

I directly use getParentStages which will call RDD's getPartitions before sending JobSubmitted event.

Does this really call .partitions? It looks like getParentStages just looks at RDDs' dependencies. I was suggesting something more like using getParentStages to get the list of RDDs, then explicitly doing _.foreach(_.partitions) on that list.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25784 has finished for PR 3794 at commit aed530b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@YanTangZhai
Copy link
Contributor Author

@JoshRosen Thanks. I've updated it as your comments. Please review again. However, these's merge conflicts. I will resolve this conflict if this approach is passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expand this comment to explain that the reason for performing this call here is that computing the partitions may be very expensive for certain types of RDDs (e.g. HadoopRDDs), so therefore we'd like that computation to take place outside of the DAGScheduler to avoid blocking its event processing loop. I'd also mention SPARK-4961 so that it's easier to find more context on JIRA.

@JoshRosen
Copy link
Contributor

This approach looks good to me, so feel free to bring this up to date with master.

@SparkQA
Copy link

SparkQA commented Jan 24, 2015

Test build #26041 has finished for PR 3794 at commit 267e375.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2015

Test build #26042 has finished for PR 3794 at commit d5c0e84.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@YanTangZhai
Copy link
Contributor Author

@JoshRosen I've brought this up to date with master. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this could be a thread-safety issue: getParentStages could call getShuffleMapStage, which mutates a non-thread-safe shuffleToMapStage map. Even if that map was synchronized, we could still have race-conditions between calls from the event processing loop and external calls.

Do you think we could just call rdd.partitions on the final RDD (e.g. the rdd local variable here) instead of calling getParentStages?

@YanTangZhai
Copy link
Contributor Author

@JoshRosen I don't think just calling rdd.partitions on the final RDD could achieve our goal. Furthermore, rdd.partitions has been called before:
470 // Check to make sure we are not launching a task on a partition that does not exist.
471 val maxPartitions = rdd.partitions.length
However, it does not work for some scene like the example contrived by me.
To avoid thread-safety issue, do you think we could use another method to get parent stages which does not mutate any global map, or we could just use another method like getParentPartitions committed by me before to get partitions directly?

@JoshRosen
Copy link
Contributor

/cc @marmbrus, since you mentioned seeing this issue before. Do you think the proposal of having our own DAG traversal outside of DAGScheduler + calling partitions there will fix the case that you encountered?

@SparkQA
Copy link

SparkQA commented Apr 13, 2015

Test build #30147 has finished for PR 3794 at commit d5c0e84.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@JoshRosen
Copy link
Contributor

FYI, this is on my list of old PRs / issues to revisit in the medium-term. I'm also considering adding some instrumentation to DAGScheduler to make this type of blocking / slowdown easier to discover; see https://issues.apache.org/jira/browse/SPARK-8344

@JoshRosen
Copy link
Contributor

In #7002, I added message processing time metrics to DAGScheduler using Codahale metrics, so it should now be much easier to benchmark this.

@andrewor14
Copy link
Contributor

@markhamstra @kayousterhout could you have a look?

@rxin
Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants