Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Sep 21, 2016

Currently there's a scalability problem with cached relations, in that
stats for every column, for each partition, are captured in the driver.
For large tables that leads to lots and lots of memory usage.

This change modifies the accumulator used to capture stats in the
driver to summarize the data as it arrives, instead of collecting
everything and then summarizing it.

Previously, for each column, the driver needed:

(64 + 2 * sizeof(type)) * number of partitions

With the change, the driver requires a fixed 8 bytes per column.

On top of that, the change fixes a second problem dealing with how
statistics of cached relations that share stats with another one
(e.g. a cached projection of a cached relation) are calculated; previously,
the data would be wrong since the accumulator data would be summarized
based on the child output (while the data reflected the parent's output).
Now the calculation is done based on how the child's output maps to the
parent's output, yielding the correct size.

Tested with the new unit test (which makes sure the calculated stats are
correct), and by looking at the relation size in a heap dump.

Currently there's a scalability problem with cached relations, in that
stats for every column, for each partition, are captured in the driver.
For large tables that leads to lots and lots of memory usage.

This change modifies the accumulator used to capture stats in the
driver to summarize the data as it arrives, instead of collecting
everything and then summarizing it.

Previously, for each column, the driver needed:

  (64 + 2 * sizeof(type)) * number of partitions

With the change, the driver requires a fixed 8 bytes per column.

On top of that, the change fixes a second problem dealing with how
statistics of cached relations that share stats with another one
(e.g. a cache projection of a cached relation) are calculated; previously,
the data would be wrong since the accumulator data would be summarized
based on the child output (while the data reflected the parent's output).
Now the calculation is done based on how the child's output maps to the
parent's output, yielding the correct size.
@vanzin
Copy link
Contributor Author

vanzin commented Sep 21, 2016

/cc @yhuai

This version fixes the problem with my previous patch (which, turned out, also existed in a slightly different way in the existing code).

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65743 has finished for PR 15189 at commit 5b3a65a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ColStatsAccumulator(originalOutput: Seq[Attribute])

@yhuai
Copy link
Contributor

yhuai commented Sep 22, 2016

Cool. Thanks! I may not have time today or tomorrow. I will try to take a look at it during the weekend.

* Accumulator for storing column stats. Summarizes the data in the driver to curb the amount of
* memory being used. Only "sizeInBytes" for each column is kept.
*/
class ColStatsAccumulator(originalOutput: Seq[Attribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the class name explicitly say that it is for sizeInBytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to leave it generic in case other stats need to be added later, but not worries, I can change the name.


// Create a projection of the cached data and make sure the statistics are correct.
val projected = cached.withOutput(Seq(plan.output.last))
assert(projected.statistics.sizeInBytes === expectedAnswer.size * LONG.defaultSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I understand the last two parts. After we cache the dataset, I am not sure if we can change the number of output columns (this test) or the data types (the next one).

If we do a project on the cached dataset, we will see a project operator on top of the InMemoryRelation.

I am wondering what kinds of queries can cause this problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I looked again at a heap dump with a couple of cached relations and you're right; I had misinterpreted the previous data. I'll remove these tests and simplify the code.

Still I'd be a little more comfortable if there was an assert in InMemoryRelation.withOutput that the new output at least is of the same size as the previous one...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... in that case, isn't my previous patch correct? (#15112)

My worry about that patch was multiple cached relations with different outputs sharing the same accumulator. But if that doesn't happen, then that patch is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai given the above, is it ok if I just revert your revert of my previous patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see. Sorry, I may have missed something. How to reproduce the problem that led us to revert the previous PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check with @liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked the code. The output of an InMemoryRelation always represent the materialized dataset. So, it should not be a set of the underlying dataset's column set. When we scan this relation in InMemoryTableScanExec, we will push the selection to the scan.

So, even we use withOutput in CacheManager's useCachedData, we should be fine to still use the original stats because we are not changing the dataset. If you look at the implementation of this method

def useCachedData(plan: LogicalPlan): LogicalPlan = {
    plan transformDown {
      case currentFragment =>
        lookupCachedData(currentFragment)
          .map(_.cachedRepresentation.withOutput(currentFragment.output))
          .getOrElse(currentFragment)
    }
  }

lookupCachedData is implemented using sameResult. So, we are just applying a equivalent output (attributes in this output list may have cosmetic variations but they should be equivalent to the original attributes of this dataset).

Although we may have different outputs, they are still representing the same dataset. So, seems it is fine if they have the same accumulator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming. So we should be fine with the previous patch.

@vanzin
Copy link
Contributor Author

vanzin commented Sep 29, 2016

Closing this in favor of #15304.

@vanzin vanzin closed this Sep 29, 2016
@vanzin vanzin deleted the SPARK-17549 branch November 30, 2016 22:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants