Skip to content

Conversation

@yucai
Copy link
Contributor

@yucai yucai commented Apr 25, 2018

What changes were proposed in this pull request?

HashAggregate uses the same hash algorithm and seed as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n.

Considering below example:

SET spark.sql.shuffle.partitions=8192;
INSERT OVERWRITE TABLE target_xxx
SELECT
 item_id,
 auct_end_dt
FROM
 from source_xxx
GROUP BY
 item_id,
 auct_end_dt;

In the shuffle stage, if user sets the shuffle.partition = 8192, all tuples in the same partition will meet the following relationship:

hash(tuple x) = hash(tuple y) + n * 8192

Then in the next HashAggregate stage, all tuples from the same partition need be put into a 16K BytesToBytesMap (unsafeRowAggBuffer).

Here, the HashAggregate uses the same hash algorithm on the same expression as shuffle, and uses the same seed, and 16K = 8192 * 2, so actually, all tuples in the same parititon will only be hashed to 2 different places in the BytesToBytesMap. It is bad hash conflict. With BytesToBytesMap growing, the conflict will always exist.

Before change:
hash_conflict

After change:
no_hash_conflict

How was this patch tested?

Unit tests and production cases.

@maropu
Copy link
Member

maropu commented Apr 25, 2018

oh... good catch. I think you'd be better to put the detailed info. (written in the jira) in the description above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about writing comments for the reason why we set the seed value here?

@yucai
Copy link
Contributor Author

yucai commented Apr 25, 2018

@maropu Thanks for comments, I have updated, could you help take a look at?

@maropu
Copy link
Member

maropu commented Apr 25, 2018

cc: @hvanhovell

@viirya
Copy link
Member

viirya commented Apr 25, 2018

Can you also show the screenshot after this change?

@SparkQA
Copy link

SparkQA commented Apr 25, 2018

Test build #89830 has finished for PR 21149 at commit 5e88468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2018

Test build #89839 has finished for PR 21149 at commit 0818618.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented May 8, 2018

ping @hvanhovell @gatorsmile

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in e17567c May 8, 2018
@yucai
Copy link
Contributor Author

yucai commented May 8, 2018

@maropu @hvanhovell thanks very much!

// SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions
// as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n,
// pick a different seed to avoid this conflict
val hashExpr = Murmur3Hash(groupingExpressions, 48)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use UnsafeRow.hashCode here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan you mean unsafeRowKeys.hashCode(), right?
I think it is a good idea, unsafe row has [null bit set] etc., the result should be different, we don't need weird 48 also. Do you want me to create a followup PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please, thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan would this perform slower since now we are moving to interpreted version for hashcode generation? If not then why didn't we use unsafeRowKeys.hashCode() in the first place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be faster, as unsafeRowKeys.hashCode() is just one function call. I don't know why we didn't do it in the first place, the code is pretty old.

cloud-fan pushed a commit that referenced this pull request Feb 19, 2019
…n HashAggregate

## What changes were proposed in this pull request?

This is a followup PR for #21149.

New way uses unsafeRow.hashCode() as hash value in HashAggregate.
The unsafe row has [null bit set] etc., so the hash should be different from shuffle hash, and then we don't need a special seed.

## How was this patch tested?

UTs.

Closes #23821 from yucai/unsafe_hash.

Authored-by: yucai <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants