Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Dec 18, 2017

There are two main changes to speed up rendering of the tasks list
when rendering the stage page.

The first one makes the code only load the tasks being shown in the
current page of the tasks table, and information related to only
those tasks. One side-effect of this change is that the graph that
shows task-related events now only shows events for the tasks in
the current page, instead of the previously hardcoded limit of "events
for the first 1000 tasks". That ends up helping with readability,
though.

To make sorting efficient when using a disk store, the task wrapper
was extended to include many new indices, one for each of the sortable
columns in the UI, and metrics for which quantiles are calculated.

The second changes the way metric quantiles are calculated for stages.
Instead of using the "Distribution" class to process data for all task
metrics, which requires scanning all tasks of a stage, the code now
uses the KVStore "skip()" functionality to only read tasks that contain
interesting information for the quantiles that are desired.

This is still not cheap; because there are many metrics that the UI
and API track, the code needs to scan the index for each metric to
gather the information. Savings come mainly from skipping deserialization
when using the disk store, but the in-memory code also seems to be
faster than before (most probably because of other changes in this
patch).

To make subsequent calls faster, some quantiles are cached in the
status store. This makes UIs much faster after the first time a stage
has been loaded.

With the above changes, a lot of code in the UI layer could be simplified.

There are two main changes to speed up rendering of the tasks list
when rendering the stage page.

The first one makes the code only load the tasks being shown in the
current page of the tasks table, and information related to only
those tasks. One side-effect of this change is that the graph that
shows task-related events now only shows events for the tasks in
the current page, instead of the previously hardcoded limit of "events
for the first 1000 tasks". That ends up helping with readability,
though.

To make sorting efficient when using a disk store, the task wrapper
was extended to include many new indices, one for each of the sortable
columns in the UI, and metrics for which quantiles are calculated.

The second changes the way metric quantiles are calculated for stages.
Instead of using the "Distribution" class to process data for all task
metrics, which requires scanning all tasks of a stage, the code now
uses the KVStore "skip()" functionality to only read tasks that contain
interesting information for the quantiles that are desired.

This is still not cheap; because there are many metrics that the UI
and API track, the code needs to scan the index for each metric to
gather the information. Savings come mainly from skipping deserialization
when using the disk store, but the in-memory code also seems to be
faster than before (most probably because of other changes in this
patch).

To make subsequent calls faster, some quantiles are cached in the
status store. This makes UIs much faster after the first time a stage
has been loaded.

With the above changes, a lot of code in the UI layer could be simplified.
@vanzin
Copy link
Contributor Author

vanzin commented Dec 18, 2017

Some previous comments for this PR can be found at: vanzin#41

@vanzin
Copy link
Contributor Author

vanzin commented Dec 19, 2017

I did some non-scientific test locally and this change causes the disk store for an app to grow to about double its previous size; while not great, given how much new data is being written, that's probably ok.

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85073 has finished for PR 20013 at commit 5b7f205.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85289 has finished for PR 20013 at commit 429d12e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 3, 2018

Test build #85614 has finished for PR 20013 at commit e22bceb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 3, 2018

@squito

@ajbozarth
Copy link
Member

I'll take a look at this either today or tomorrow

@ajbozarth
Copy link
Member

I did a read through, I didn't see any issues from a code standpoint but I haven't spent much time in your new code so another pair of eyes would be good. I also didn't check out and run the code either, if you want me too I can do that tomorrow.

metrics.shuffleWriteMetrics.recordsWritten))
if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics
val old = this.metrics
val newMetrics = new MetricsTracker()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a method in MetricsTracker which accepts a TaskMetrics and update the metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be the only place where it's used, so I don't see any gains.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing so many fields here seems ugly..But I respect you preference

@gengliangwang
Copy link
Member

@vanzin I haven't run the code. I wonder which changes double the disk usage? The new indices or the cached quantiles?

@vanzin
Copy link
Contributor Author

vanzin commented Jan 5, 2018

I wonder which changes double the disk usage?

It's the new indices, more explicitly the values, not the keys. I tried changing the disk layout to write all the indices in a new namespace with a very short key length, and that didn't change the resulting store size at all.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks reasonable, my comments are nothing important really. I need to do another pass though. Also didn't look at closely at the UI related code.

* Whether to cache information about a specific metric quantile. We cache quantiles at every 0.05
* step, which covers the default values used both in the API and in the stages page.
*/
private def shouldCacheQuantile(q: Double): Boolean = ((q * 100).toInt % 5) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this happens to work, but fyi its a bit dangerous with roundoff -- 0.55 * 100 has an error, luckily it happens to be in a direction fixed by toInt.

// to make the code simpler.
if (cachedQuantiles.size == quantiles.size) {
def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = {
cachedQuantiles.map(fn).toIndexedSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor -- you can skip a copy here that comes with toIndexedSeq. You could use a WrappedArray which is also an IndexedSeq. you can just drop the toIndexedSeq and it'll happen automatically (implicit conversion in Predef), or you can make explicit with WrappedArray.make().

scala> val arr = Array(1,2,3,4)
arr: Array[Int] = Array(1, 2, 3, 4)
scala> val x: IndexedSeq[Int] = arr
x: IndexedSeq[Int] = WrappedArray(1, 2, 3, 4)
scala> x.toArray eq arr
res9: Boolean = true
scala> arr.toIndexedSeq.toArray eq arr
res10: Boolean = false

// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: an alternative would be to use a datastructure which supports online approximate quantile computation. tdigest is the one I'm familiar with, perhaps there are others.. But lets leave exploring that for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny you mention that, my first implementation actually used t-digest. But it's a bit slow for this purpose (the unreleased version at the time was faster) and uses quite a bit of memory, so I gave up trying that for now. It also changes the computed values (since they'd be approximations), which requires changing all the golden files (argh).

There's some code for approximate quantiles in the sql module that could potentially be refactored.

@SparkQA
Copy link

SparkQA commented Jan 6, 2018

Test build #85737 has finished for PR 20013 at commit c4e7f61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

weakIntern(info.executorId),
weakIntern(info.host),
weakIntern(info.status),
weakIntern(info.taskLocality.toString()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't familiar with WeakInterner before, its neat. Not super important, but for most of these, wouldn't strong interning be fine? I expect the same values to come up over and over again across apps for these:

executorId
host
status
taskLocality
storageLevel

though weak interning makes sense for

exec.hostPort
accumulators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think weak vs. strong interning makes a whole lot of difference with Java 8 (= no perm gen). I used the weak one because that's what the old code used.

It can probably be extended to more things, but tasks are the bulk of an app's data, and that's what I was focusing on in this PR.

@squito
Copy link
Contributor

squito commented Jan 6, 2018

lgtm

@gengliangwang
Copy link
Member

The code looks good. But it is a lot of changes in SHS, I suggest running more tests(real workloads) before merge.

@gengliangwang
Copy link
Member

Tried run with event logs of TPCDS benchmark(scale 1) on my local setup, the UI works.
The leveldb size is about 50% larger.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 8, 2018

I suggest running more tests(real workloads)

I've tested all of these PRs attached to SPARK-18085 with lots of real and synthetic app logs.

@sameeragarwal
Copy link
Member

sameeragarwal commented Jan 8, 2018

@vanzin Thanks for the great work! Given this is a sizable change, I'll like get this in before cutting RC1. (/cc @cloud-fan @gengliangwang)

@SparkQA
Copy link

SparkQA commented Jan 8, 2018

Test build #85806 has finished for PR 20013 at commit ed59b4d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 9, 2018

Test build #85809 has finished for PR 20013 at commit 44c9647.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val now = System.nanoTime()

val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
val (updatedTask, metricsDelta) = liveTasks.remove(event.taskInfo.taskId).map { task =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is updatedTask used?

scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))

// Go through the computed quantiles and cache the values that match the caching criteria.
computedQuantiles.quantiles.zipWithIndex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to cache them for disk store, but may be an overkill for in-memory store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cached data is really small, no point in having different code paths to save a few hundred bytes.


this.metrics = newMetrics
if (old.executorDeserializeTime >= 0L) {
old.subtract(newMetrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be newMetrics.subtract(old)? The returned delta will be added to an old metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is right. The parameter is actually a delta; so it has to be applied to the old value, except when the old value is not initialized (which is the condition in L162).

But let me double check this again.

Copy link
Contributor Author

@vanzin vanzin Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually sorry, the parameters is not the delta, the return value is. This code is not right but the solution is different than what you suggest. I'll also add a test for it.

val taskId: JLong,
@KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE)
val index: Int,
@KVIndexParam(value = TaskIndexNames.ATTEMPT, parent = TaskIndexNames.STAGE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to create index for every field?

Copy link
Contributor Author

@vanzin vanzin Jan 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the UI can be sorted by all of these. (And also that makes it possible to calculate the quantiles without having to deserialize everything.)

shuffleReadMetrics = shuffleReadMetrics,
shuffleWriteMetrics = shuffleWriteMetrics
)
executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here there are a lot accesses to KV store.
And I think many new indexes are only used here.
I think we can make it simpler, just get the task data with one access to KV store, and get the quantiles

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, cache miss should not be very frequent, I think it's fine to be a little expensive when building the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deserializing 100k tasks from the disk store is a lot more expensive than this (which only deserializes number of indices * number of quantiles). Believe me, I tested all of the suggestions you guys are making.

@gengliangwang
Copy link
Member

The major concern is that with these code changes, the memory usage will be much larger with InMemoryStore.
Also building so many new indexes just for getting computedQuantiles, seems overkilling.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 9, 2018

the memory usage will be much larger with InMemoryStore.

No it won't. The in-memory store does not use extra memory for indices. It sorts the data when the index is read, and discards that memory after the iterator is discarded.

@SparkQA
Copy link

SparkQA commented Jan 9, 2018

Test build #85867 has finished for PR 20013 at commit 86275b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

shuffleWriteRecords += delta.shuffleWriteMetrics.recordsWritten
memoryBytesSpilled += delta.memoryBytesSpilled
diskBytesSpilled += delta.diskBytesSpilled
private class MetricsTracker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use api.v1.TaskMetrics here? They are both immutable view, the only difference is MetricsTracker flattens the fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but the code becomes uglier because v1.TaskMetrics is kind of an annoying type to use.

buildUpdate()
}

def updateAndGet(kvstore: KVStore, now: Long): TaskDataWrapper = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need it? Since the caller side doesn't care about the return value, we can just update the lastWriteTime in doUpdate, and then call doUpdate instead.

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85899 has finished for PR 20013 at commit 3d66505.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85924 has finished for PR 20013 at commit 34d02b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 11, 2018
There are two main changes to speed up rendering of the tasks list
when rendering the stage page.

The first one makes the code only load the tasks being shown in the
current page of the tasks table, and information related to only
those tasks. One side-effect of this change is that the graph that
shows task-related events now only shows events for the tasks in
the current page, instead of the previously hardcoded limit of "events
for the first 1000 tasks". That ends up helping with readability,
though.

To make sorting efficient when using a disk store, the task wrapper
was extended to include many new indices, one for each of the sortable
columns in the UI, and metrics for which quantiles are calculated.

The second changes the way metric quantiles are calculated for stages.
Instead of using the "Distribution" class to process data for all task
metrics, which requires scanning all tasks of a stage, the code now
uses the KVStore "skip()" functionality to only read tasks that contain
interesting information for the quantiles that are desired.

This is still not cheap; because there are many metrics that the UI
and API track, the code needs to scan the index for each metric to
gather the information. Savings come mainly from skipping deserialization
when using the disk store, but the in-memory code also seems to be
faster than before (most probably because of other changes in this
patch).

To make subsequent calls faster, some quantiles are cached in the
status store. This makes UIs much faster after the first time a stage
has been loaded.

With the above changes, a lot of code in the UI layer could be simplified.

Author: Marcelo Vanzin <[email protected]>

Closes #20013 from vanzin/SPARK-20657.

(cherry picked from commit 1c70da3)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 1c70da3 Jan 11, 2018
@vanzin vanzin deleted the SPARK-20657 branch January 16, 2018 19:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants