Skip to content

Conversation

@liancheng
Copy link
Contributor

@liancheng liancheng commented Nov 22, 2016

What changes were proposed in this pull request?

This PR fixes a random OOM issue occurred while running ObjectHashAggregateSuite.

This issue can be steadily reproduced under the following conditions:

  1. The aggregation must be evaluated using ObjectHashAggregateExec;
  2. There must be an input column whose data type involves ArrayType (an input column of MapType may even cause SIGSEGV);
  3. Sort-based aggregation fallback must be triggered during evaluation.

The root cause is that while falling back to sort-based aggregation, we must sort and feed already evaluated partial aggregation buffers living in the hash map to the sort-based aggregator using an external sorter. However, the underlying mutable byte buffer of UnsafeRows produced by the iterator of the external sorter is reused and may get overwritten when the iterator steps forward. After the last entry is consumed, the byte buffer points to a block of uninitialized memory filled by 5a. Therefore, while reading an UnsafeArrayData out of the UnsafeRow, 5a5a5a5a is treated as array size and triggers a memory allocation for a ridiculously large array and immediately blows up the JVM with an OOM.

To fix this issue, we only need to add .copy() accordingly.

How was this patch tested?

New regression test case added in ObjectHashAggregateSuite.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68982 has started for PR 15976 at commit 4b88eed.

@liancheng
Copy link
Contributor Author

liancheng commented Nov 22, 2016

The last build failure was caused by a logical conflict with #15703. We don't really have any aggregate functions that don't support partial aggregation now after merging #15703, while the re-enabled test cases still check for that condition.

}

doubleSafeCheckRows(actual1, expected, 1e-4)
doubleSafeCheckRows(actual2, expected, 1e-4)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the changes made above in this file are used to resolve a logical conflict with PR #15703. We don't really have any aggregate functions that don't support partial aggregation now after merging #15703, must update the tests to reflect that.

@liancheng
Copy link
Contributor Author

cc @yhuai @cloud-fan

@dongjoon-hyun
Copy link
Member

Thank you for fixing this, @liancheng !

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69016 has finished for PR 15976 at commit 6db5af9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

retest this please

@liancheng
Copy link
Contributor Author

liancheng commented Nov 22, 2016

The last build failure was caused by irrelevant YARN tests.

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #69027 has finished for PR 15976 at commit 6db5af9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Also cc @davies and @sameeragarwal.

@liancheng
Copy link
Contributor Author

liancheng commented Nov 23, 2016

A similar alternative fix @yhuai proposed is to convert the underlying UnsafeRow into a safe row (i.e. GenericInternalRow in this case) using a projection instead of simply adding a .copy(). In this way, we prevent adding unsafe data into a safe row, which is in general safer. This approach may further affects performance, though.

processRow(result.aggregationBuffer, inputIterator.getValue)
// Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be
// overwritten when `inputIterator` steps forward, we need to do a deep copy here.
processRow(result.aggregationBuffer, inputIterator.getValue.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem is, during processRow we cache the input row somehow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's caused by MutableProjection? As MutableProjection may keep an "pointer" that points to a memory region of an unsafe row. Maybe we can fix this bug by #15082?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, #15082 needs some significant refactor, we should get this fix in 2.1 first.

// 3. Sort-based aggregation fallback must be triggered during evaluation.
withSQLConf(
SQLConf.USE_OBJECT_HASH_AGG.key -> "true",
SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but the config name looks weird, how about OBJECT_AGG_FALLBACK_TO_SORT_THRESHOLD

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 28, 2016

Test build #69224 has started for PR 15976 at commit 6db5af9.

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Nov 28, 2016

Test build #69234 has finished for PR 15976 at commit 6db5af9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 2e80990 Nov 29, 2016
@liancheng liancheng deleted the investigate-oom branch November 29, 2016 19:18
@liancheng
Copy link
Contributor Author

@cloud-fan @dongjoon-hyun Thanks for the review!

robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…ggregateExec

## What changes were proposed in this pull request?

This PR fixes a random OOM issue occurred while running `ObjectHashAggregateSuite`.

This issue can be steadily reproduced under the following conditions:

1. The aggregation must be evaluated using `ObjectHashAggregateExec`;
2. There must be an input column whose data type involves `ArrayType` (an input column of `MapType` may even cause SIGSEGV);
3. Sort-based aggregation fallback must be triggered during evaluation.

The root cause is that while falling back to sort-based aggregation, we must sort and feed already evaluated partial aggregation buffers living in the hash map to the sort-based aggregator using an external sorter. However, the underlying mutable byte buffer of `UnsafeRow`s produced by the iterator of the external sorter is reused and may get overwritten when the iterator steps forward. After the last entry is consumed, the byte buffer points to a block of uninitialized memory filled by `5a`. Therefore, while reading an `UnsafeArrayData` out of the `UnsafeRow`, `5a5a5a5a` is treated as array size and triggers a memory allocation for a ridiculously large array and immediately blows up the JVM with an OOM.

To fix this issue, we only need to add `.copy()` accordingly.

## How was this patch tested?

New regression test case added in `ObjectHashAggregateSuite`.

Author: Cheng Lian <[email protected]>

Closes apache#15976 from liancheng/investigate-oom.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ggregateExec

## What changes were proposed in this pull request?

This PR fixes a random OOM issue occurred while running `ObjectHashAggregateSuite`.

This issue can be steadily reproduced under the following conditions:

1. The aggregation must be evaluated using `ObjectHashAggregateExec`;
2. There must be an input column whose data type involves `ArrayType` (an input column of `MapType` may even cause SIGSEGV);
3. Sort-based aggregation fallback must be triggered during evaluation.

The root cause is that while falling back to sort-based aggregation, we must sort and feed already evaluated partial aggregation buffers living in the hash map to the sort-based aggregator using an external sorter. However, the underlying mutable byte buffer of `UnsafeRow`s produced by the iterator of the external sorter is reused and may get overwritten when the iterator steps forward. After the last entry is consumed, the byte buffer points to a block of uninitialized memory filled by `5a`. Therefore, while reading an `UnsafeArrayData` out of the `UnsafeRow`, `5a5a5a5a` is treated as array size and triggers a memory allocation for a ridiculously large array and immediately blows up the JVM with an OOM.

To fix this issue, we only need to add `.copy()` accordingly.

## How was this patch tested?

New regression test case added in `ObjectHashAggregateSuite`.

Author: Cheng Lian <[email protected]>

Closes apache#15976 from liancheng/investigate-oom.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants