Skip to content

Conversation

@sameeragarwal
Copy link
Member

What changes were proposed in this pull request?

This patch speeds up group-by aggregates by around 3-5x by leveraging an in-memory AggregateHashMap (please see #12161), an append-only aggregate hash map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates (and fall back to the BytesToBytesMap if a given key isn't found).

Architecturally, it is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the key-value pairs. The index lookups in the array rely on linear probing (with a small number of maximum tries) and use an inexpensive hash function which makes it really efficient for a majority of lookups. However, using linear probing and an inexpensive hash function also makes it less robust as compared to the BytesToBytesMap (especially for a large number of keys or even for certain distribution of keys) and requires us to fall back on the latter for correctness.

How was this patch tested?

Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
codegen = F                              2124 / 2204          9.9         101.3       1.0X
codegen = T hashmap = F                  1198 / 1364         17.5          57.1       1.8X
codegen = T hashmap = T                   369 /  600         56.8          17.6       5.8X

@sameeragarwal
Copy link
Member Author

Generate code for a query of the form sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() looks like:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */ 
/* 005 */ /** Codegened pipeline for:
/* 006 */ * TungstenAggregate(key=[k#227L], functions=[(sum(k#227L),mode=Partial,isDistinct=false)], output=[k#227L,sum#235L])
/* 007 */ +- Project [(...
/* 008 */   */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean agg_initAgg;
/* 012 */   private agg_VectorizedHashMap agg_vectorizedHashMap;
/* 013 */   private java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row> agg_vectorizedHashMapIter;
/* 014 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan;
/* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 017 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetric range_numOutputRows;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue range_metricValue;
/* 020 */   private boolean range_initRange;
/* 021 */   private long range_partitionEnd;
/* 022 */   private long range_number;
/* 023 */   private boolean range_overflow;
/* 024 */   private scala.collection.Iterator range_input;
/* 025 */   private UnsafeRow range_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
/* 028 */   private UnsafeRow project_result;
/* 029 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 030 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 031 */   private UnsafeRow agg_result;
/* 032 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 033 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 034 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner;
/* 035 */   private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows;
/* 036 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue;
/* 037 */   private UnsafeRow wholestagecodegen_result;
/* 038 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder wholestagecodegen_holder;
/* 039 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter wholestagecodegen_rowWriter;
/* 040 */   
/* 041 */   public GeneratedIterator(Object[] references) {
/* 042 */     this.references = references;
/* 043 */   }
/* 044 */   
/* 045 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 046 */     partitionIndex = index;
/* 047 */     agg_initAgg = false;
/* 048 */     agg_vectorizedHashMap = new agg_VectorizedHashMap();
/* 049 */     
/* 050 */     this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 051 */     
/* 052 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 053 */     range_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) range_numOutputRows.localValue();
/* 054 */     range_initRange = false;
/* 055 */     range_partitionEnd = 0L;
/* 056 */     range_number = 0L;
/* 057 */     range_overflow = false;
/* 058 */     range_input = inputs[0];
/* 059 */     range_result = new UnsafeRow(1);
/* 060 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
/* 061 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
/* 062 */     project_result = new UnsafeRow(1);
/* 063 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 0);
/* 064 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 065 */     agg_result = new UnsafeRow(1);
/* 066 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 067 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 068 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 069 */     this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 070 */     wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue();
/* 071 */     wholestagecodegen_result = new UnsafeRow(2);
/* 072 */     this.wholestagecodegen_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(wholestagecodegen_result, 0);
/* 073 */     this.wholestagecodegen_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(wholestagecodegen_holder, 2);
/* 074 */   }
/* 075 */   
/* 076 */   public class agg_VectorizedHashMap {
/* 077 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
/* 078 */     private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
/* 079 */     private int[] buckets;
/* 080 */     private int numBuckets;
/* 081 */     private int maxSteps;
/* 082 */     private int numRows = 0;
/* 083 */     private org.apache.spark.sql.types.StructType schema =
/* 084 */     new org.apache.spark.sql.types.StructType()
/* 085 */     .add("k", org.apache.spark.sql.types.DataTypes.LongType)
/* 086 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType);
/* 087 */     
/* 088 */     private org.apache.spark.sql.types.StructType aggregateBufferSchema =
/* 089 */     
/* 090 */     new org.apache.spark.sql.types.StructType()
/* 091 */     .add("sum", org.apache.spark.sql.types.DataTypes.LongType);
/* 092 */     
/* 093 */     public agg_VectorizedHashMap() {
/* 094 */       // TODO: These should be generated based on the schema
/* 095 */       int DEFAULT_CAPACITY = 1 << 16;
/* 096 */       double DEFAULT_LOAD_FACTOR = 0.25;
/* 097 */       int DEFAULT_MAX_STEPS = 2;
/* 098 */       assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
/* 099 */       this.maxSteps = DEFAULT_MAX_STEPS;
/* 100 */       numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
/* 101 */       
/* 102 */       batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 103 */         org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
/* 104 */       
/* 105 */       // TODO: Possibly generate this projection in TungstenAggregate directly
/* 106 */       aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
/* 107 */         aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
/* 108 */       for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
/* 109 */         aggregateBufferBatch.setColumn(i, batch.column(i+1));
/* 110 */       }
/* 111 */       
/* 112 */       buckets = new int[numBuckets];
/* 113 */       java.util.Arrays.fill(buckets, -1);
/* 114 */     }
/* 115 */     
/* 116 */     public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(long agg_key) {
/* 117 */       long h = hash(agg_key);
/* 118 */       int step = 0;
/* 119 */       int idx = (int) h & (numBuckets - 1);
/* 120 */       while (step < maxSteps) {
/* 121 */         // Return bucket index if it's either an empty slot or already contains the key
/* 122 */         if (buckets[idx] == -1) {
/* 123 */           batch.column(0).putLong(numRows, agg_key);
/* 124 */           batch.column(1).putNull(numRows);
/* 125 */           buckets[idx] = numRows++;
/* 126 */           batch.setNumRows(numRows);
/* 127 */           aggregateBufferBatch.setNumRows(numRows);
/* 128 */           return aggregateBufferBatch.getRow(buckets[idx]);
/* 129 */         } else if (equals(idx, agg_key)) {
/* 130 */           return aggregateBufferBatch.getRow(buckets[idx]);
/* 131 */         }
/* 132 */         idx = (idx + 1) & (numBuckets - 1);
/* 133 */         step++;
/* 134 */       }
/* 135 */       // Didn't find it
/* 136 */       return null;
/* 137 */     }
/* 138 */     
/* 139 */     private boolean equals(int idx, long agg_key) {
/* 140 */       return batch.column(0).getLong(buckets[idx]) == agg_key;
/* 141 */     }
/* 142 */     
/* 143 */     // TODO: Improve this hash function
/* 144 */     private long hash(long agg_key) {
/* 145 */       return agg_key;
/* 146 */     }
/* 147 */     
/* 148 */     public java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
/* 149 */     rowIterator() {
/* 150 */       return batch.rowIterator();
/* 151 */     }
/* 152 */     
/* 153 */     public void close() {
/* 154 */       batch.close();
/* 155 */     }
/* 156 */     
/* 157 */   }
/* 158 */   
/* 159 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 160 */     agg_hashMap = agg_plan.createHashMap();
/* 161 */     
/* 162 */     /*** PRODUCE: Project [(id#224L & 65535) AS k#227L] */
/* 163 */     
/* 164 */     /*** PRODUCE: Range 0, 1, 1, 20971520, [id#224L] */
/* 165 */     
/* 166 */     // initialize Range
/* 167 */     if (!range_initRange) {
/* 168 */       range_initRange = true;
/* 169 */       initRange(partitionIndex);
/* 170 */     }
/* 171 */     
/* 172 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 173 */       long range_value = range_number;
/* 174 */       range_number += 1L;
/* 175 */       if (range_number < range_value ^ 1L < 0) {
/* 176 */         range_overflow = true;
/* 177 */       }
/* 178 */       
/* 179 */       /*** CONSUME: Project [(id#224L & 65535) AS k#227L] */
/* 180 */       
/* 181 */       /*** CONSUME: TungstenAggregate(key=[k#227L], functions=[(sum(k#227L),mode=Partial,isDistinct=false)], output=[k#227L,sum#235L]) */
/* 182 */       /* (input[0, bigint] & 65535) */
/* 183 */       long project_value = -1L;
/* 184 */       project_value = range_value & 65535L;
/* 185 */       
/* 186 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 187 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null;
/* 188 */       
/* 189 */       if (true) {
/* 190 */         if (!false) {
/* 191 */           agg_vectorizedAggBuffer = agg_vectorizedHashMap.findOrInsert(
/* 192 */             project_value);
/* 193 */         }
/* 194 */       }
/* 195 */       
/* 196 */       if (agg_vectorizedAggBuffer == null) {
/* 197 */         // generate grouping key
/* 198 */         agg_rowWriter.write(0, project_value);
/* 199 */         /* hash(input[0, bigint], 42) */
/* 200 */         int agg_value3 = 42;
/* 201 */         
/* 202 */         agg_value3 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(project_value, agg_value3);
/* 203 */         if (true) {
/* 204 */           // try to get the buffer from hash map
/* 205 */           agg_unsafeRowAggBuffer =
/* 206 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value3);
/* 207 */         }
/* 208 */         if (agg_unsafeRowAggBuffer == null) {
/* 209 */           if (agg_sorter == null) {
/* 210 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 211 */           } else {
/* 212 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 213 */           }
/* 214 */           
/* 215 */           // the hash map had be spilled, it should have enough memory now,
/* 216 */           // try  to allocate buffer again.
/* 217 */           agg_unsafeRowAggBuffer =
/* 218 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value3);
/* 219 */           if (agg_unsafeRowAggBuffer == null) {
/* 220 */             // failed to allocate the first page
/* 221 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 222 */           }
/* 223 */         }
/* 224 */       }
/* 225 */       
/* 226 */       if (agg_vectorizedAggBuffer != null) {
/* 227 */         // update vectorized row
/* 228 */         
/* 229 */         // evaluate aggregate function
/* 230 */         /* (coalesce(input[0, bigint], cast(0 as bigint)) + cast(input[1, bigint] as bigint)) */
/* 231 */         /* coalesce(input[0, bigint], cast(0 as bigint)) */
/* 232 */         /* input[0, bigint] */
/* 233 */         boolean agg_isNull6 = agg_vectorizedAggBuffer.isNullAt(0);
/* 234 */         long agg_value7 = agg_isNull6 ? -1L : (agg_vectorizedAggBuffer.getLong(0));
/* 235 */         boolean agg_isNull5 = agg_isNull6;
/* 236 */         long agg_value6 = agg_value7;
/* 237 */         
/* 238 */         if (agg_isNull5) {
/* 239 */           /* cast(0 as bigint) */
/* 240 */           boolean agg_isNull7 = false;
/* 241 */           long agg_value8 = -1L;
/* 242 */           if (!false) {
/* 243 */             agg_value8 = (long) 0;
/* 244 */           }
/* 245 */           if (!agg_isNull7) {
/* 246 */             agg_isNull5 = false;
/* 247 */             agg_value6 = agg_value8;
/* 248 */           }
/* 249 */         }
/* 250 */         /* cast(input[1, bigint] as bigint) */
/* 251 */         boolean agg_isNull9 = false;
/* 252 */         long agg_value10 = -1L;
/* 253 */         if (!false) {
/* 254 */           agg_value10 = project_value;
/* 255 */         }
/* 256 */         long agg_value5 = -1L;
/* 257 */         agg_value5 = agg_value6 + agg_value10;
/* 258 */         // update vectorized row
/* 259 */         agg_vectorizedAggBuffer.setLong(0, agg_value5);
/* 260 */         
/* 261 */       } else {
/* 262 */         // update unsafe row
/* 263 */         
/* 264 */         // evaluate aggregate function
/* 265 */         /* (coalesce(input[0, bigint], cast(0 as bigint)) + cast(input[1, bigint] as bigint)) */
/* 266 */         /* coalesce(input[0, bigint], cast(0 as bigint)) */
/* 267 */         /* input[0, bigint] */
/* 268 */         boolean agg_isNull13 = agg_unsafeRowAggBuffer.isNullAt(0);
/* 269 */         long agg_value14 = agg_isNull13 ? -1L : (agg_unsafeRowAggBuffer.getLong(0));
/* 270 */         boolean agg_isNull12 = agg_isNull13;
/* 271 */         long agg_value13 = agg_value14;
/* 272 */         
/* 273 */         if (agg_isNull12) {
/* 274 */           /* cast(0 as bigint) */
/* 275 */           boolean agg_isNull14 = false;
/* 276 */           long agg_value15 = -1L;
/* 277 */           if (!false) {
/* 278 */             agg_value15 = (long) 0;
/* 279 */           }
/* 280 */           if (!agg_isNull14) {
/* 281 */             agg_isNull12 = false;
/* 282 */             agg_value13 = agg_value15;
/* 283 */           }
/* 284 */         }
/* 285 */         /* cast(input[1, bigint] as bigint) */
/* 286 */         boolean agg_isNull16 = false;
/* 287 */         long agg_value17 = -1L;
/* 288 */         if (!false) {
/* 289 */           agg_value17 = project_value;
/* 290 */         }
/* 291 */         long agg_value12 = -1L;
/* 292 */         agg_value12 = agg_value13 + agg_value17;
/* 293 */         // update unsafe row buffer
/* 294 */         agg_unsafeRowAggBuffer.setLong(0, agg_value12);
/* 295 */         
/* 296 */       }
/* 297 */       
/* 298 */       if (shouldStop()) return;
/* 299 */     }
/* 300 */     
/* 301 */     agg_vectorizedHashMapIter = agg_vectorizedHashMap.rowIterator();
/* 302 */     
/* 303 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter);
/* 304 */   }
/* 305 */   
/* 306 */   private void initRange(int idx) {
/* 307 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 308 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 309 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(20971520L);
/* 310 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 311 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 312 */     
/* 313 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 314 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 315 */       range_number = Long.MAX_VALUE;
/* 316 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 317 */       range_number = Long.MIN_VALUE;
/* 318 */     } else {
/* 319 */       range_number = st.longValue();
/* 320 */     }
/* 321 */     
/* 322 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 323 */     .multiply(step).add(start);
/* 324 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 325 */       range_partitionEnd = Long.MAX_VALUE;
/* 326 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 327 */       range_partitionEnd = Long.MIN_VALUE;
/* 328 */     } else {
/* 329 */       range_partitionEnd = end.longValue();
/* 330 */     }
/* 331 */     
/* 332 */     range_metricValue.add((range_partitionEnd - range_number) / 1L);
/* 333 */   }
/* 334 */   
/* 335 */   protected void processNext() throws java.io.IOException {
/* 336 */     /*** PRODUCE: TungstenAggregate(key=[k#227L], functions=[(sum(k#227L),mode=Partial,isDistinct=false)], output=[k#227L,sum#235L]) */
/* 337 */     
/* 338 */     if (!agg_initAgg) {
/* 339 */       agg_initAgg = true;
/* 340 */       agg_doAggregateWithKeys();
/* 341 */     }
/* 342 */     
/* 343 */     // output the result
/* 344 */     
/* 345 */     while (agg_vectorizedHashMapIter.hasNext()) {
/* 346 */       wholestagecodegen_metricValue.add(1);
/* 347 */       org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row wholestagecodegen_vectorizedHashMapRow =
/* 348 */       (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
/* 349 */       agg_vectorizedHashMapIter.next();
/* 350 */       
/* 351 */       wholestagecodegen_rowWriter.zeroOutNullBytes();
/* 352 */       
/* 353 */       /* input[0, bigint] */
/* 354 */       long wholestagecodegen_value = wholestagecodegen_vectorizedHashMapRow.getLong(0);
/* 355 */       wholestagecodegen_rowWriter.write(0, wholestagecodegen_value);
/* 356 */       
/* 357 */       /* input[1, bigint] */
/* 358 */       boolean wholestagecodegen_isNull1 = wholestagecodegen_vectorizedHashMapRow.isNullAt(1);
/* 359 */       long wholestagecodegen_value1 = wholestagecodegen_isNull1 ? -1L : (wholestagecodegen_vectorizedHashMapRow.getLong(1));
/* 360 */       if (wholestagecodegen_isNull1) {
/* 361 */         wholestagecodegen_rowWriter.setNullAt(1);
/* 362 */       } else {
/* 363 */         wholestagecodegen_rowWriter.write(1, wholestagecodegen_value1);
/* 364 */       }
/* 365 */       
/* 366 */       /*** CONSUME: WholeStageCodegen */
/* 367 */       
/* 368 */       append(wholestagecodegen_result);
/* 369 */       
/* 370 */       if (shouldStop()) return;
/* 371 */     }
/* 372 */     
/* 373 */     agg_vectorizedHashMap.close();
/* 374 */     
/* 375 */     while (agg_mapIter.next()) {
/* 376 */       wholestagecodegen_metricValue.add(1);
/* 377 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 378 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 379 */       
/* 380 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer);
/* 381 */       
/* 382 */       /*** CONSUME: WholeStageCodegen */
/* 383 */       
/* 384 */       append(agg_resultRow);
/* 385 */       
/* 386 */       if (shouldStop()) return;
/* 387 */     }
/* 388 */     
/* 389 */     agg_mapIter.close();
/* 390 */     if (agg_sorter == null) {
/* 391 */       agg_hashMap.free();
/* 392 */     }
/* 393 */   }
/* 394 */ }

@sameeragarwal
Copy link
Member Author

cc @nongli

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55677 has finished for PR 12345 at commit 4ee5687.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55678 has finished for PR 12345 at commit c2fc385.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|// TODO: Improve this hash function
|private long hash($groupingKeySignature) {
| return ${groupingKeys.map(_._2).mkString(" ^ ")};
| return ${groupingKeys.map(_._2).mkString(" | ")};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not do implmeent the h = h * 37 + v hash function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason, was planning to do this as part of a separate small PR (along with the benchmarks). Please let me know if you'd prefer it here instead

@nongli
Copy link
Contributor

nongli commented Apr 13, 2016

/* 120 */ batch.column(1).putLong(numRows, 0);
I don't think this is right. You should initialize the agg exprs to NULL

@nongli
Copy link
Contributor

nongli commented Apr 13, 2016

I don't think this works if we have NULL keys. This is kind of annoying, let's think about that tomorrow.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55735 has finished for PR 12345 at commit fc6b8cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55750 has finished for PR 12345 at commit ececd57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55753 has finished for PR 12345 at commit 0ca0db1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Apr 14, 2016

@sameeragarwal have you updated the generated code?

@sameeragarwal
Copy link
Member Author

@nongli just did!

| org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
| org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
| aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
| aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave a TODO to fix this. There should be a nicer way to get a projection of a batch instead of this.

@sameeragarwal sameeragarwal changed the title [SPARK-14447][SQL] Speed up TungstenAggregate w/ keys using AggregateHashMap [SPARK-14447][SQL] Speed up TungstenAggregate w/ keys using VectorizedHashMap Apr 14, 2016
@sameeragarwal
Copy link
Member Author

@nongli thanks, comments addressed.

@SparkQA
Copy link

SparkQA commented Apr 14, 2016

Test build #55797 has finished for PR 12345 at commit 041c001.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val isAggregateHashMapEnabled: Boolean = false
val isAggregateHashMapSupported: Boolean =
// We currently only enable vectorized hashmap for long key/value types
isVectorizedHashMapEnabled = isVectorizedHashMapEnabled &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's weird that you have the check split between here and line 268. maybe combine all these checks to 268 and make isVecdtorizedEnabled a val.

@SparkQA
Copy link

SparkQA commented Apr 15, 2016

Test build #55869 has finished for PR 12345 at commit ec66a54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Apr 15, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Apr 15, 2016

Test build #55886 has finished for PR 12345 at commit 9b5ee1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in b5c60bc Apr 15, 2016
asfgit pushed a commit that referenced this pull request Apr 15, 2016
## What changes were proposed in this pull request?

This PR uses a better hashing algorithm while probing the AggregateHashMap:

```java
long h = 0
h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2);
...
h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2);
return h
```

Depends on: #12345
## How was this patch tested?

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    codegen = F                              2417 / 2457          8.7         115.2       1.0X
    codegen = T hashmap = F                  1554 / 1581         13.5          74.1       1.6X
    codegen = T hashmap = T                   877 /  929         23.9          41.8       2.8X

Author: Sameer Agarwal <[email protected]>

Closes #12379 from sameeragarwal/hash.
lw-lin pushed a commit to lw-lin/spark that referenced this pull request Apr 20, 2016
## What changes were proposed in this pull request?

This PR uses a better hashing algorithm while probing the AggregateHashMap:

```java
long h = 0
h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2);
...
h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2);
return h
```

Depends on: apache#12345
## How was this patch tested?

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    codegen = F                              2417 / 2457          8.7         115.2       1.0X
    codegen = T hashmap = F                  1554 / 1581         13.5          74.1       1.6X
    codegen = T hashmap = T                   877 /  929         23.9          41.8       2.8X

Author: Sameer Agarwal <[email protected]>

Closes apache#12379 from sameeragarwal/hash.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants