Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Dec 10, 2019

What changes were proposed in this pull request?

This patch changes the condition to check if BytesToBytesMap should grow up its internal array. Specifically, it changes to compare by the capacity of the array, instead of its size.

Why are the changes needed?

One Spark job on our cluster hangs forever at BytesToBytesMap.safeLookup. After inspecting, the long array size is 536870912.

Currently in BytesToBytesMap.append, we only grow the internal array if the size of the array is less than its MAX_CAPACITY that is 536870912. So in above case, the array can not be grown up, and safeLookup can not find an empty slot forever.

But it is wrong because we use two array entries per key, so the array size is twice the capacity. We should compare the current capacity of the array, instead of its size.

Does this PR introduce any user-facing change?

No

How was this patch tested?

This issue only happens when loading big number of values into BytesToBytesMap, so it is hard to do unit test. This is tested manually with internal Spark job.

This is a manual test case. Once the number of keys reaches 268435456, lookup will hang forever.

@Test
  public void testCapacity() {
    TestMemoryManager memoryManager2 =
            new TestMemoryManager(
                    new SparkConf()
                            .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
                            .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L)
                            .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
                            .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        final long[] value = new long[]{i};
        map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8).append(
                value,
                Platform.LONG_ARRAY_OFFSET,
                8,
                value,
                Platform.LONG_ARRAY_OFFSET,
                8);
      }
      map.free();
    } finally {
      map.free();
    }

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115086 has finished for PR 26828 at commit 001d3cb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 10, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115092 has finished for PR 26828 at commit 001d3cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 10, 2019

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115126 has finished for PR 26828 at commit 5ac203e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// We should compare the current capacity of the array, instead of its size.
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I also think that we should false canGrowArray like:

if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) {
  canGrowArray = false;
}

So as we reach max capacity of the map, canGrowArray is set to false. We can fail next append and let the map spill and fallback to sort-based aggregation in HashAggregate. Thus we can prevent a similar forever-loop happens when we reach max capacity.

cc @cloud-fan @felixcheung

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we won't call growAndRehash here, is it expected?

Copy link
Member Author

@viirya viirya Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was not meaning to replace current condition, but to add another check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me think about it. if making sense, will submit another PR for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I'm encountering the same problem that you describe here. When the map is close to MAX_CAPACITY and needs to grow, numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY is true. This prevents the map from resizing, but currently canGrowArray remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. This ultimately causes the query to fail in the UnsafeKVExternalSorter constructor.

It looks like you didn't submit a PR for this - is there a reason why not? If there's no problem with your suggested fix, I can submit a PR now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response! I saw that PR (#26914) but I don't think it solves the problem I'm encountering. That PR stops accepting new keys once we have reached MAX_CAPACITY - 1 keys, but this is too late. By that time, we will have far exceeded the growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting, causing the query to fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem I posted above, is when we reach MAX_CAPACITY, a forever-loop happens during calling lookup. The previous PR fixed it. Sounds like you are encountering another problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting, causing the query to fail.

In UnsafeKVExternalSorter, it will check if the long array can be reused or not. Isn't? If it cannot be reused, a new pointer array will be created, no?

Copy link
Contributor

@ankurdave ankurdave Sep 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like you are encountering another problem?

You're right, it's not the same problem - I was mistaken in saying so earlier.

In UnsafeKVExternalSorter, it will check if the long array can be reused or not. Isn't? If it cannot be reused, a new pointer array will be created, no?

Yes, but by this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed SPARK-32872 and submitted #29744 to fix this.

longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me some more details of BytesToBytesMap? What happens if we don't grow? I don't see a loop in this method and not sure how the job hangs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure.

The client of BytesToBytesMap, like HashAggregate, will call lookup to find a Location to write value. The returned location will be used to do append (Location.append). Everytime after we append a key/value, we check if it is time to grow internal array and grow up it if needed.

lookup delegates looking up keys to safeLookup. Its control flow looks like:

int pos = hash & mask;
int step = 1;
// an infinite loop until find matching key or empty slot.
while (true) {
  if (empty slot found) {
    ...
  } else {
    // check if matching key
    ...
  }
  // increase pos and step
  pos = (pos + step) & mask;
  step++;
}

So the job hangs in this loop because it can not find any empty location as the internal array is full.

We early stop growing the internal array due to wrongly check array size at:

if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
  ...
}

Another point #26828 (comment) is we may want to set canGrowArray to false once we are close to max capacity, so we can avoid infinite loop again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't lookup throw OOM if no space can be found?

Copy link
Member Author

@viirya viirya Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup just looks up for empty slot in the internal array for a new key. It does not allocate memory. The array is allocated/grown up in last time append.

Once an empty slot (a Location object) is found, the client of BytesToBytesMap may call append to the Location, OOM could be thrown during append new key/value.

Copy link
Contributor

@cloud-fan cloud-fan Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me confirm the problem: so append mistakenly think there is enough space, and doesn't grow the array. This makes the client of BytesToBytesMap keeping calling lookup and hang. Is my understanding correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append mistakenly think it can not grow the array anymore so does not grow the array. It keeps append value until full. Then the client calling lookup can not find an empty slot and gets stuck in infinite loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the append doesn't grow the array while it should. This makes BytesToBytesMap malformed (the array is not big enough to serve the data region) and causes problems.

if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . Can we have explicit test cases for these boundary conditions?
(Sorry, I removed my previous comment.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max capacity is big number. Is it ok to have unit test allocating such big array?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guessed we can use mock with simple growAndRehash (although I didn't try).

Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I don't want to block this PR and you because this looks urgent. I'll try that later by myself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sounds good. I will also do test to see if I can add it. Thanks for the suggestion!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, even mock with growAndRehash to avoid allocate the array, append still needs to allocate memory page for values to insert. Mock with append makes less sense because it is where this logic remains.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you already tested that. Got it. Thank you for spending time for that.

@dongjoon-hyun
Copy link
Member

cc @dbtsai

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master/2.4.

Since MAX_CAPACITY is defined as The maximum number of keys that BytesToBytesMap supports and longArray is defined as A single array to store the key and value, this PR is correct.

dongjoon-hyun pushed a commit that referenced this pull request Dec 11, 2019
… as expected

### What changes were proposed in this pull request?

This patch changes the condition to check if BytesToBytesMap should grow up its internal array. Specifically, it changes to compare by the capacity of the array, instead of its size.

### Why are the changes needed?

One Spark job on our cluster hangs forever at BytesToBytesMap.safeLookup. After inspecting, the long array size is 536870912.

Currently in BytesToBytesMap.append, we only grow the internal array if the size of the array is less than its MAX_CAPACITY that is 536870912. So in above case, the array can not be grown up, and safeLookup can not find an empty slot forever.

But it is wrong because we use two array entries per key, so the array size is twice the capacity. We should compare the current capacity of the array, instead of its size.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

This issue only happens when loading big number of values into BytesToBytesMap, so it is hard to do unit test. This is tested manually with internal Spark job.

Closes #26828 from viirya/fix-bytemap.

Lead-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b4aeaf9)
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

cc @JoshRosen , @gatorsmile , @wangyum (since he is a release manager for 3.0.0-preview2).

@cloud-fan
Copy link
Contributor

Can we at least provide a manual regression test in the PR description? so that people can try and evaluate the risk.

@viirya
Copy link
Member Author

viirya commented Dec 13, 2019

Can we at least provide a manual regression test in the PR description? so that people can try and evaluate the risk.

Good suggestion. I added one manual test case in the PR description.

@viirya viirya deleted the fix-bytemap branch December 27, 2023 18:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants