Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,21 @@ class VectorizedHashMapGenerator(
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
| private int[] buckets;
| private int numBuckets;
| private int maxSteps;
| private int capacity = 1 << 16;
| private double loadFactor = 0.5;
| private int numBuckets = (int) (capacity / loadFactor);
| private int maxSteps = 2;
| private int numRows = 0;
| private org.apache.spark.sql.types.StructType schema = $generatedSchema
| private org.apache.spark.sql.types.StructType aggregateBufferSchema =
| $generatedAggBufferSchema
|
| public $generatedClassName() {
| // TODO: These should be generated based on the schema
| int DEFAULT_CAPACITY = 1 << 16;
| double DEFAULT_LOAD_FACTOR = 0.25;
| int DEFAULT_MAX_STEPS = 2;
| assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
| this.maxSteps = DEFAULT_MAX_STEPS;
| numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
|
| batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
| org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
|
| org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
| // TODO: Possibly generate this projection in TungstenAggregate directly
| aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
| for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
| aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
| }
Expand All @@ -130,9 +123,11 @@ class VectorizedHashMapGenerator(
*/
private def generateHashFunction(): String = {
s"""
|// TODO: Improve this hash function
|private long hash($groupingKeySignature) {
| return ${groupingKeys.map(_._2).mkString(" | ")};
| long h = 0;
| ${groupingKeys.map(key => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);")
.mkString("\n")}
| return h;
|}
""".stripMargin
}
Expand Down Expand Up @@ -201,15 +196,20 @@ class VectorizedHashMapGenerator(
| while (step < maxSteps) {
| // Return bucket index if it's either an empty slot or already contains the key
| if (buckets[idx] == -1) {
| ${groupingKeys.zipWithIndex.map(k =>
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
| ${bufferValues.zipWithIndex.map(k =>
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
.mkString("\n")}
| buckets[idx] = numRows++;
| batch.setNumRows(numRows);
| aggregateBufferBatch.setNumRows(numRows);
| return aggregateBufferBatch.getRow(buckets[idx]);
| if (numRows < capacity) {
| ${groupingKeys.zipWithIndex.map(k =>
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
| ${bufferValues.zipWithIndex.map(k =>
s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
.mkString("\n")}
| buckets[idx] = numRows++;
| batch.setNumRows(numRows);
| aggregateBufferBatch.setNumRows(numRows);
| return aggregateBufferBatch.getRow(buckets[idx]);
| } else {
| // No more space
| return null;
| }
| } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) {
| return aggregateBufferBatch.getRow(buckets[idx]);
| }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
*/
}

ignore("aggregate with keys") {
ignore("aggregate with linear keys") {
val N = 20 << 20

val benchmark = new Benchmark("Aggregate w keys", N)
Expand Down Expand Up @@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2219 / 2392 9.4 105.8 1.0X
codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X
codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X
codegen = F 2067 / 2166 10.1 98.6 1.0X
codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X
codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X
*/
}

ignore("aggregate with randomized keys") {
val N = 20 << 20

val benchmark = new Benchmark("Aggregate w keys", N)
sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")

def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this plan look like?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WholeStageCodegen
:  +- TungstenAggregate(key=[k#3L,k#3L], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[k#3L,k#3L,sum(id)#183L])
:     +- INPUT
+- Exchange hashpartitioning(k#3L, k#3L, 1), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[k#3L,k#3L], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[k#3L,k#3L,sum#185L])
      :     +- Project [id#0L,FLOOR((rand(-9053518532274118725) * 10000.0)) AS k#3L]
      :        +- Range 0, 1, 1, 20971520, [id#0L]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel you're spending a lot of time in rand

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Doesn't registerTempTable materialize the table before registering it? If not, is there a way to do that?


benchmark.addCase(s"codegen = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
f()
}

benchmark.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2517 / 2608 8.3 120.0 1.0X
codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X
codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X
*/
}

Expand Down