Skip to content

Conversation

@yucai
Copy link
Contributor

@yucai yucai commented Feb 18, 2019

What changes were proposed in this pull request?

This is a followup PR for #21149.

New way uses unsafeRow.hashCode() as hash value in HashAggregate.
The unsafe row has [null bit set] etc., so the hash should be different from shuffle hash, and then we don't need a special seed.

How was this patch tested?

UTs.

@yucai
Copy link
Contributor Author

yucai commented Feb 18, 2019

Generated codes:

/* 054 */     // generate grouping key
/* 055 */     agg_mutableStateArray_0[0].reset();
/* 056 */
/* 057 */     agg_mutableStateArray_0[0].write(0, agg_expr_0_0);
/* 058 */     int agg_unsafeRowHash_0 = (agg_mutableStateArray_0[0].getRow()).hashCode();
/* 059 */     if (true) {
/* 060 */       // try to get the buffer from hash map
/* 061 */       agg_unsafeRowAggBuffer_0 =
/* 062 */       agg_hashMap_0.getAggregationBufferFromUnsafeRow((agg_mutableStateArray_0[0].getRow()), agg_unsafeRowHash_0);
/* 063 */     }
/* 064 */     // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 065 */     // aggregation after processing all input rows.
/* 066 */     if (agg_unsafeRowAggBuffer_0 == null) {
/* 067 */       if (agg_sorter_0 == null) {
/* 068 */         agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 069 */       } else {
/* 070 */         agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 071 */       }
/* 072 */
/* 073 */       // the hash map had be spilled, it should have enough memory now,
/* 074 */       // try to allocate buffer again.
/* 075 */       agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 076 */         (agg_mutableStateArray_0[0].getRow()), agg_unsafeRowHash_0);
/* 077 */       if (agg_unsafeRowAggBuffer_0 == null) {
/* 078 */         // failed to allocate the first page
/* 079 */         throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 080 */       }
/* 081 */     }

@yucai
Copy link
Contributor Author

yucai commented Feb 18, 2019

Tests:

spark.conf.set("spark.sql.shuffle.partitions", 8192)
val df = spark.range(16 * 1024 * 1024).selectExpr("cast(id as string) as id")
df.groupBy("id").agg(count("id")).collect

Before:

    val hashExpr = Murmur3Hash(groupingExpressions, 42)
    val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)

After:

|// generate grouping key
|${unsafeRowKeyCode.code}
|${hashEval.code}
|int $unsafeRowHash = ${unsafeRowKeyCode.value}.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unsafeKeyHash?

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, this is a smarter way...

@SparkQA
Copy link

SparkQA commented Feb 18, 2019

Test build #102455 has finished for PR 23821 at commit 9260293.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Feb 18, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Feb 18, 2019

Test build #102464 has finished for PR 23821 at commit 9260293.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yucai
Copy link
Contributor Author

yucai commented Feb 19, 2019

retest this please

@yucai
Copy link
Contributor Author

yucai commented Feb 19, 2019

@cloud-fan , kindly help review.

@cloud-fan
Copy link
Contributor

did you verify it that it can avoid the hash conflict with the problematic query?

@yucai
Copy link
Contributor Author

yucai commented Feb 19, 2019

did you verify it that it can avoid the hash conflict with the problematic query?

I used another query to verify this issue.

See: #23821 (comment)

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 743b73d Feb 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants