Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Nov 10, 2016

What changes were proposed in this pull request?

As per the discussion at #15807, we need to change the way of subexpression elimination.

In current approach, common subexpressions are evaluated no matter they are really used or not later. E.g., in the following generated codes, subexpr2 is evaluated even only the if branch is run.

if (isNull(subexpr)) {
  ...
} else {
  AssertNotNull(subexpr)  // subexpr2
  ....
  SomeExpr(AssertNotNull(subexpr)) // SomeExpr(subexpr2)
}

Besides possible performance regression, the expression like AssertNotNull can throw exception. So wrongly evaluating subexpr2 will throw exception unexceptedly..

With this patch, now common subexpressions are not evaluated until they are used. We create a function for each common subexpression which evaluates and stores the result as a member variable. We have an initialization status variable to record whether the subexpression is evaluated.

Thus, when an expression using the subexpression is going to be evaluated, we check if the subexpression is initialized, if yes directly returning the result, if no call the function to evaluate it.

How was this patch tested?

Jenkins tests.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

@viirya
Copy link
Member Author

viirya commented Nov 10, 2016

val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupByKey(_._1).agg(typed.sum(_._2), typed.sum(_._2)).collect

Part of Generated codes:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private double agg_bufValue;
/* 011 */   private double agg_classChildVar;
/* 012 */   private boolean agg_classChildVarIsNull;
/* 013 */   private boolean agg_bufIsNull1;
/* 014 */   private double agg_bufValue1;
/* 015 */   private double agg_classChildVar1;
/* 016 */   private boolean agg_classChildVarIsNull1;
/* 017 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan;
/* 018 */   private agg_FastHashMap agg_fastHashMap;
/* 019 */   private org.apache.spark.unsafe.KVIterator agg_fastHashMapIter;
/* 020 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 021 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 022 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory;
/* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize;
/* 025 */   private scala.collection.Iterator inputadapter_input;
/* 026 */   private UnsafeRow agg_result1;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 028 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 029 */   private int agg_value8;
/* 030 */   private boolean[] agg_argIsNulls;
/* 031 */   private java.lang.String agg_argValue;
/* 032 */   private int agg_argValue1;
/* 033 */   private boolean agg_evalSubExprIsNull;
/* 034 */   private boolean agg_evalSubExprIsInitialized;
/* 035 */   private scala.Tuple2 agg_evalSubExprValue;
/* 036 */   private double agg_classChildVar2;
/* 037 */   private boolean agg_classChildVarIsNull2;
/* 038 */   private double agg_classChildVar3;
/* 039 */   private boolean agg_classChildVarIsNull3;
/* 040 */   private boolean[] agg_argIsNulls1;
/* 041 */   private java.lang.String agg_argValue2;
/* 042 */   private int agg_argValue3;
/* 043 */   private boolean agg_evalSubExpr1IsNull;
/* 044 */   private boolean agg_evalSubExpr1IsInitialized;
/* 045 */   private scala.Tuple2 agg_evalSubExpr1Value;
/* 046 */   private double agg_classChildVar4;
/* 047 */   private boolean agg_classChildVarIsNull4;
/* 048 */   private double agg_classChildVar5;
/* 049 */   private boolean agg_classChildVarIsNull5;
/* 050 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner;
/* 051 */   private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_numOutputRows;
/* 052 */   private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_aggTime;
/* 053 */
/* 054 */   public GeneratedIterator(Object[] references) {
/* 055 */     this.references = references;
/* 056 */   }
/* 057 */
/* 058 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 059 */     partitionIndex = index;
/* 060 */     this.inputs = inputs;
/* 061 */     wholestagecodegen_init_0();
/* 062 */     wholestagecodegen_init_1();
/* 063 */
/* 064 */   }
/* 065 */
/* 066 */   private void wholestagecodegen_init_0() {
/* 067 */     agg_initAgg = false;
/* 068 */
/* 069 */     this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 070 */     agg_fastHashMap = new agg_FastHashMap(agg_plan.getTaskMemoryManager(), agg_plan.getEmptyAggregationBuffer());
/* 071 */
/* 072 */     this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 073 */     this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 074 */     inputadapter_input = inputs[0];
/* 075 */     agg_result1 = new UnsafeRow(1);
/* 076 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 32);
/* 077 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 078 */
/* 079 */     agg_argIsNulls = new boolean[2];
/* 080 */
/* 081 */     agg_evalSubExprIsNull = false;
/* 082 */     agg_evalSubExprIsInitialized = false;
/* 083 */     agg_evalSubExprValue = null;
/* 084 */
/* 085 */     agg_argIsNulls1 = new boolean[2];
/* 086 */
/* 087 */     agg_evalSubExpr1IsNull = false;
/* 088 */     agg_evalSubExpr1IsInitialized = false;
/* 089 */     agg_evalSubExpr1Value = null;
/* 090 */
/* 091 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 092 */     this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 093 */
/* 094 */   }
/* 095 */
/* 096 */   private scala.Tuple2 agg_evalSubExpr1ForValue(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 097 */     if (!agg_evalSubExpr1IsInitialized) {
/* 098 */       agg_evalSubExpr1(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 099 */     }
/* 100 */     return agg_evalSubExpr1Value;
/* 101 */   }
/* 102 */
/* 103 */   private void agg_evalSubExpr1(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 104 */     boolean agg_isNull21 = inputadapter_isNull;
/* 105 */     java.lang.String agg_value23 = null;
/* 106 */     if (!agg_isNull21) {
/* 107 */       Object agg_funcResult3 = null;
/* 108 */       agg_funcResult3 = inputadapter_value.toString();
/* 109 */       if (agg_funcResult3 == null) {
/* 110 */         agg_isNull21 = true;
/* 111 */       } else {
/* 112 */         agg_value23 = (java.lang.String) agg_funcResult3;
/* 113 */       }
/* 114 */
/* 115 */     }
/* 116 */     agg_isNull21 = agg_value23 == null;
/* 117 */     agg_argIsNulls1[0] = agg_isNull21;
/* 118 */     agg_argValue2 = agg_value23;
/* 119 */
/* 120 */     agg_argIsNulls1[1] = false;
/* 121 */     agg_argValue3 = inputadapter_value1;
/* 122 */
/* 123 */     final scala.Tuple2 agg_value22 = false ? null : new scala.Tuple2(agg_argValue2, agg_argValue3);
/* 124 */     agg_evalSubExpr1IsNull = false;
/* 125 */     agg_evalSubExpr1Value = agg_value22;
/* 126 */     agg_evalSubExpr1IsInitialized = true;
/* 127 */   }
/* 128 */
/* 129 */   public class agg_FastHashMap {
/* 130 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 131 */     private int[] buckets;
/* 132 */     private int capacity = 1 << 16;
/* 133 */     private double loadFactor = 0.5;
/* 134 */     private int numBuckets = (int) (capacity / loadFactor);
/* 135 */     private int maxSteps = 2;
/* 136 */     private int numRows = 0;
/* 137 */     private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("value", org.apache.spark.sql.types.DataTypes.StringType);
/* 138 */     private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("value", org.apache.spark.sql.types.DataTypes.DoubleType)
/* 139 */     .add("value", org.apache.spark.sql.types.DataTypes.DoubleType);
/* 140 */     private Object emptyVBase;
/* 141 */     private long emptyVOff;
/* 142 */     private int emptyVLen;
/* 143 */     private boolean isBatchFull = false;
/* 144 */
/* 145 */     public agg_FastHashMap(
/* 146 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 147 */       InternalRow emptyAggregationBuffer) {
/* 148 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 149 */       .allocate(keySchema, valueSchema, taskMemoryManager, capacity);
/* 150 */
/* 151 */       final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema);
/* 152 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 153 */
/* 154 */       emptyVBase = emptyBuffer;
/* 155 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 156 */       emptyVLen = emptyBuffer.length;
/* 157 */
/* 158 */       buckets = new int[numBuckets];
/* 159 */       java.util.Arrays.fill(buckets, -1);
/* 160 */     }
/* 161 */
/* 162 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(UTF8String agg_key) {
/* 163 */       long h = hash(agg_key);
/* 164 */       int step = 0;
/* 165 */       int idx = (int) h & (numBuckets - 1);
/* 166 */       while (step < maxSteps) {
/* 167 */         // Return bucket index if it's either an empty slot or already contains the key
/* 168 */         if (buckets[idx] == -1) {
/* 169 */           if (numRows < capacity && !isBatchFull) {
/* 170 */             // creating the unsafe for new entry
/* 171 */             UnsafeRow agg_result = new UnsafeRow(1);
/* 172 */             org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
/* 173 */             = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 174 */               32);
/* 175 */             org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
/* 176 */             = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 177 */               agg_holder,
/* 178 */               1);
/* 179 */             agg_holder.reset(); //TODO: investigate if reset or zeroout are actually needed
/* 180 */             agg_rowWriter.zeroOutNullBytes();
/* 181 */             agg_rowWriter.write(0, agg_key);
/* 182 */             agg_result.setTotalSize(agg_holder.totalSize());
/* 183 */             Object kbase = agg_result.getBaseObject();
/* 184 */             long koff = agg_result.getBaseOffset();
/* 185 */             int klen = agg_result.getSizeInBytes();
/* 186 */
/* 187 */             UnsafeRow vRow
/* 188 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 189 */             if (vRow == null) {
/* 190 */               isBatchFull = true;
/* 191 */             } else {
/* 192 */               buckets[idx] = numRows++;
/* 193 */             }
/* 194 */             return vRow;
/* 195 */           } else {
/* 196 */             // No more space
/* 197 */             return null;
/* 198 */           }
/* 199 */         } else if (equals(idx, agg_key)) {
/* 200 */           return batch.getValueRow(buckets[idx]);
/* 201 */         }
/* 202 */         idx = (idx + 1) & (numBuckets - 1);
/* 203 */         step++;
/* 204 */       }
/* 205 */       // Didn't find it
/* 206 */       return null;
/* 207 */     }
/* 208 */
/* 209 */     private boolean equals(int idx, UTF8String agg_key) {
/* 210 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 211 */       return (row.getUTF8String(0).equals(agg_key));
/* 212 */     }
/* 213 */
/* 214 */     private long hash(UTF8String agg_key) {
/* 215 */       long agg_hash = 0;
/* 216 */
/* 217 */       int agg_result = 0;
/* 218 */       byte[] agg_bytes = agg_key.getBytes();
/* 219 */       for (int i = 0; i < agg_bytes.length; i++) {
/* 220 */         int agg_hash1 = agg_bytes[i];
/* 221 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2);
/* 222 */       }
/* 223 */
/* 224 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2);
/* 225 */
/* 226 */       return agg_hash;
/* 227 */     }
/* 228 */
/* 229 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 230 */       return batch.rowIterator();
/* 231 */     }
/* 232 */
/* 233 */     public void close() {
/* 234 */       batch.close();
/* 235 */     }
/* 236 */
/* 237 */   }
/* 238 */
/* 239 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 240 */     agg_hashMap = agg_plan.createHashMap();
/* 241 */
/* 242 */     while (inputadapter_input.hasNext()) {
/* 243 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 244 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 245 */       UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0));
/* 246 */       int inputadapter_value1 = inputadapter_row.getInt(1);
/* 247 */       boolean inputadapter_isNull2 = inputadapter_row.isNullAt(2);
/* 248 */       UTF8String inputadapter_value2 = inputadapter_isNull2 ? null : (inputadapter_row.getUTF8String(2));
/* 249 */
/* 250 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 251 */
/* 252 */       UnsafeRow agg_fastAggBuffer = null;
/* 253 */
/* 254 */       if (true) {
/* 255 */         if (!inputadapter_isNull2) {
/* 256 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 257 */             inputadapter_value2);
/* 258 */         }
/* 259 */       }
/* 260 */
/* 261 */       if (agg_fastAggBuffer == null) {
/* 262 */         // generate grouping key
/* 263 */         agg_holder.reset();
/* 264 */
/* 265 */         agg_rowWriter.zeroOutNullBytes();
/* 266 */
/* 267 */         if (inputadapter_isNull2) {
/* 268 */           agg_rowWriter.setNullAt(0);
/* 269 */         } else {
/* 270 */           agg_rowWriter.write(0, inputadapter_value2);
/* 271 */         }
/* 272 */         agg_result1.setTotalSize(agg_holder.totalSize());
/* 273 */         agg_value8 = 42;
/* 274 */
/* 275 */         if (!inputadapter_isNull2) {
/* 276 */           agg_value8 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value2.ge
tBaseObject(), inputadapter_value2.getBaseOffset(), inputadapter_value2.numBytes(), agg_value8);
/* 277 */         }
/* 278 */         if (true) {
/* 279 */           // try to get the buffer from hash map
/* 280 */           agg_unsafeRowAggBuffer =
/* 281 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 282 */         }
/* 283 */         if (agg_unsafeRowAggBuffer == null) {
/* 284 */           if (agg_sorter == null) {
/* 285 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 286 */           } else {
/* 287 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 288 */           }
/* 289 */
/* 290 */           // the hash map had be spilled, it should have enough memory now,
/* 291 */           // try  to allocate buffer again.
/* 292 */           agg_unsafeRowAggBuffer =
/* 293 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value8);
/* 294 */           if (agg_unsafeRowAggBuffer == null) {
/* 295 */             // failed to allocate the first page
/* 296 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 297 */           }
/* 298 */         }
/* 299 */       }
/* 300 */
/* 301 */       if (agg_fastAggBuffer != null) {
/* 302 */         // update fast row
/* 303 */
/* 304 */         // reset the initialization status for common sub-expressions
/* 305 */         agg_evalSubExpr1IsInitialized = false;
/* 306 */         // evaluate aggregate function
/* 307 */         Object agg_obj2 = ((Expression) references[5]).eval(null);
/* 308 */         org.apache.spark.sql.expressions.Aggregator agg_value28 = (org.apache.spark.sql.expressions.Aggregator) agg_obj2;
/* 309 */
/* 310 */         boolean agg_isNull27 = agg_fastAggBuffer.isNullAt(0);
/* 311 */         double agg_value29 = agg_isNull27 ? -1.0 : (agg_fastAggBuffer.getDouble(0));
/* 312 */
/* 313 */         boolean agg_isNull25 = false || agg_isNull27 || agg_evalSubExpr1ForIsNull(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 314 */         double agg_value27 = -1.0;
/* 315 */         if (!agg_isNull25) {
/* 316 */           Object agg_funcResult4 = null;
/* 317 */           agg_funcResult4 = agg_value28.reduce(agg_value29, agg_evalSubExpr1ForValue(inputadapter_value, inputadapter_isNull, inputadapter_value1));
/* 318 */           if (agg_funcResult4 == null) {
/* 319 */             agg_isNull25 = true;
/* 320 */           } else {
/* 321 */             agg_value27 = (Double) agg_funcResult4;
/* 322 */           }
/* 323 */
/* 324 */         }agg_classChildVar4 = agg_value27;
/* 325 */         agg_classChildVarIsNull4 = agg_isNull25;
/* 326 */         Object agg_obj3 = ((Expression) references[6]).eval(null);
/* 327 */         org.apache.spark.sql.expressions.Aggregator agg_value32 = (org.apache.spark.sql.expressions.Aggregator) agg_obj3;
/* 328 */
/* 329 */         boolean agg_isNull31 = agg_fastAggBuffer.isNullAt(1);
/* 330 */         double agg_value33 = agg_isNull31 ? -1.0 : (agg_fastAggBuffer.getDouble(1));
/* 331 */
/* 332 */         boolean agg_isNull29 = false || agg_isNull31 || agg_evalSubExpr1ForIsNull(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 333 */         double agg_value31 = -1.0;
/* 334 */         if (!agg_isNull29) {
/* 335 */           Object agg_funcResult5 = null;
/* 336 */           agg_funcResult5 = agg_value32.reduce(agg_value33, agg_evalSubExpr1ForValue(inputadapter_value, inputadapter_isNull, inputadapter_value1));
/* 337 */           if (agg_funcResult5 == null) {
/* 338 */             agg_isNull29 = true;
/* 339 */           } else {
/* 340 */             agg_value31 = (Double) agg_funcResult5;
/* 341 */           }
/* 342 */
/* 343 */         }agg_classChildVar5 = agg_value31;
/* 344 */         agg_classChildVarIsNull5 = agg_isNull29;
/* 345 */         // update fast row
/* 346 */         if (!agg_classChildVarIsNull4) {
/* 347 */           agg_fastAggBuffer.setDouble(0, agg_classChildVar4);
/* 348 */         } else {
/* 349 */           agg_fastAggBuffer.setNullAt(0);
/* 350 */         }
/* 351 */
/* 352 */         if (!agg_classChildVarIsNull5) {
/* 353 */           agg_fastAggBuffer.setDouble(1, agg_classChildVar5);
/* 354 */         } else {
/* 355 */           agg_fastAggBuffer.setNullAt(1);
/* 356 */         }
/* 357 */
/* 358 */       } else {
/* 359 */         // update unsafe row
/* 360 */
/* 361 */         // reset the initialization status for common sub-expressions
/* 362 */         agg_evalSubExprIsInitialized = false;
/* 363 */         // evaluate aggregate function
/* 364 */         Object agg_obj = ((Expression) references[3]).eval(null);
/* 365 */         org.apache.spark.sql.expressions.Aggregator agg_value16 = (org.apache.spark.sql.expressions.Aggregator) agg_obj;
/* 366 */
/* 367 */         boolean agg_isNull15 = agg_unsafeRowAggBuffer.isNullAt(0);
/* 368 */         double agg_value17 = agg_isNull15 ? -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
/* 369 */
/* 370 */         boolean agg_isNull13 = false || agg_isNull15 || agg_evalSubExprForIsNull(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 371 */         double agg_value15 = -1.0;
/* 372 */         if (!agg_isNull13) {
/* 373 */           Object agg_funcResult1 = null;
/* 374 */           agg_funcResult1 = agg_value16.reduce(agg_value17, agg_evalSubExprForValue(inputadapter_value, inputadapter_isNull, inputadapter_value1));
/* 375 */           if (agg_funcResult1 == null) {
/* 376 */             agg_isNull13 = true;
/* 377 */           } else {
/* 378 */             agg_value15 = (Double) agg_funcResult1;
/* 379 */           }
/* 380 */
/* 381 */         }agg_classChildVar2 = agg_value15;
/* 382 */         agg_classChildVarIsNull2 = agg_isNull13;
/* 383 */         Object agg_obj1 = ((Expression) references[4]).eval(null);
/* 384 */         org.apache.spark.sql.expressions.Aggregator agg_value20 = (org.apache.spark.sql.expressions.Aggregator) agg_obj1;
/* 385 */
/* 386 */         boolean agg_isNull19 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 387 */         double agg_value21 = agg_isNull19 ? -1.0 : (agg_unsafeRowAggBuffer.getDouble(1));
/* 388 */
/* 389 */         boolean agg_isNull17 = false || agg_isNull19 || agg_evalSubExprForIsNull(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 390 */         double agg_value19 = -1.0;
/* 391 */         if (!agg_isNull17) {
/* 392 */           Object agg_funcResult2 = null;
/* 393 */           agg_funcResult2 = agg_value20.reduce(agg_value21, agg_evalSubExprForValue(inputadapter_value, inputadapter_isNull, inputadapter_value1));
/* 394 */           if (agg_funcResult2 == null) {
/* 395 */             agg_isNull17 = true;
/* 396 */           } else {
/* 397 */             agg_value19 = (Double) agg_funcResult2;
/* 398 */           }
/* 399 */
/* 400 */         }agg_classChildVar3 = agg_value19;
/* 401 */         agg_classChildVarIsNull3 = agg_isNull17;
/* 402 */         // update unsafe row buffer
/* 403 */         if (!agg_classChildVarIsNull2) {
/* 404 */           agg_unsafeRowAggBuffer.setDouble(0, agg_classChildVar2);
/* 405 */         } else {
/* 406 */           agg_unsafeRowAggBuffer.setNullAt(0);
/* 407 */         }
/* 408 */
/* 409 */         if (!agg_classChildVarIsNull3) {
/* 410 */           agg_unsafeRowAggBuffer.setDouble(1, agg_classChildVar3);
/* 411 */         } else {
/* 412 */           agg_unsafeRowAggBuffer.setNullAt(1);
/* 413 */         }
/* 414 */
/* 415 */       }
/* 416 */       if (shouldStop()) return;
/* 417 */     }
/* 418 */
/* 419 */     agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 420 */
/* 421 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize);
/* 422 */   }
/* 423 */
/* 424 */   private boolean agg_evalSubExprForIsNull(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 425 */     if (!agg_evalSubExprIsInitialized) {
/* 426 */       agg_evalSubExpr(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 427 */     }
/* 428 */     return agg_evalSubExprIsNull;
/* 429 */   }
/* 430 */
/* 431 */   private void wholestagecodegen_init_1() {
/* 432 */     this.wholestagecodegen_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[8];
/* 433 */
/* 434 */   }
/* 435 */
/* 436 */   private void agg_evalSubExpr(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 437 */     boolean agg_isNull9 = inputadapter_isNull;
/* 438 */     java.lang.String agg_value11 = null;
/* 439 */     if (!agg_isNull9) {
/* 440 */       Object agg_funcResult = null;
/* 441 */       agg_funcResult = inputadapter_value.toString();
/* 442 */       if (agg_funcResult == null) {
/* 443 */         agg_isNull9 = true;
/* 444 */       } else {
/* 445 */         agg_value11 = (java.lang.String) agg_funcResult;
/* 446 */       }
/* 447 */
/* 448 */     }
/* 449 */     agg_isNull9 = agg_value11 == null;
/* 450 */     agg_argIsNulls[0] = agg_isNull9;
/* 451 */     agg_argValue = agg_value11;
/* 452 */
/* 453 */     agg_argIsNulls[1] = false;
/* 454 */     agg_argValue1 = inputadapter_value1;
/* 455 */
/* 456 */     final scala.Tuple2 agg_value10 = false ? null : new scala.Tuple2(agg_argValue, agg_argValue1);
/* 457 */     agg_evalSubExprIsNull = false;
/* 458 */     agg_evalSubExprValue = agg_value10;
/* 459 */     agg_evalSubExprIsInitialized = true;
/* 460 */   }
/* 461 */
/* 462 */   private boolean agg_evalSubExpr1ForIsNull(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 463 */     if (!agg_evalSubExpr1IsInitialized) {
/* 464 */       agg_evalSubExpr1(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 465 */     }
/* 466 */     return agg_evalSubExpr1IsNull;
/* 467 */   }
/* 468 */
/* 469 */   private scala.Tuple2 agg_evalSubExprForValue(UTF8String inputadapter_value, boolean inputadapter_isNull, int inputadapter_value1) {
/* 470 */     if (!agg_evalSubExprIsInitialized) {
/* 471 */       agg_evalSubExpr(inputadapter_value, inputadapter_isNull, inputadapter_value1);
/* 472 */     }
/* 473 */     return agg_evalSubExprValue;
/* 474 */   }
/* 475 */
/* 476 */   protected void processNext() throws java.io.IOException {
/* 477 */     if (!agg_initAgg) {
/* 478 */       agg_initAgg = true;
/* 479 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 480 */       agg_doAggregateWithKeys();
/* 481 */       wholestagecodegen_aggTime.add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000);
/* 482 */     }
/* 483 */
/* 484 */     // output the result
/* 485 */
/* 486 */     while (agg_fastHashMapIter.next()) {
/* 487 */       wholestagecodegen_numOutputRows.add(1);
/* 488 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_fastHashMapIter.getKey();
/* 489 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_fastHashMapIter.getValue();
/* 490 */
/* 491 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer);
/* 492 */
/* 493 */       append(agg_resultRow);
/* 494 */
/* 495 */       if (shouldStop()) return;
/* 496 */     }
/* 497 */     agg_fastHashMap.close();
/* 498 */
/* 499 */     while (agg_mapIter.next()) {
/* 500 */       wholestagecodegen_numOutputRows.add(1);
/* 501 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 502 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 503 */
/* 504 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer);
/* 505 */
/* 506 */       append(agg_resultRow);
/* 507 */
/* 508 */       if (shouldStop()) return;
/* 509 */     }
/* 510 */
/* 511 */     agg_mapIter.close();
/* 512 */     if (agg_sorter == null) {
/* 513 */       agg_hashMap.free();
/* 514 */     }
/* 515 */   }
/* 516 */ }

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68445 has finished for PR 15837 at commit c0153a6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the improve-subexpr-elimination branch from c0153a6 to 63accf8 Compare November 10, 2016 07:13
@viirya viirya changed the title [SQL] Evaluate common subexpression like lazy variable with a function approach [SPARK-18395][SQL] Evaluate common subexpression like lazy variable with a function approach Nov 10, 2016
@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68450 has finished for PR 15837 at commit 63accf8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68452 has finished for PR 15837 at commit 3274e91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68455 has finished for PR 15837 at commit 3ae70e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Nov 16, 2016

ping @cloud-fan @hvanhovell @kiszk

@viirya
Copy link
Member Author

viirya commented Nov 19, 2016

ping @cloud-fan @hvanhovell @kiszk Can you help review this? Thanks.

@viirya
Copy link
Member Author

viirya commented Nov 21, 2016

re-ping @cloud-fan @kiszk @hvanhovell

@SparkQA
Copy link

SparkQA commented Nov 22, 2016

Test build #68967 has finished for PR 15837 at commit f95100b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class AesCipher
  • public class AesConfigMessage implements Encodable
  • public class ByteArrayReadableChannel implements ReadableByteChannel
  • public final class JavaStructuredKafkaWordCount
  • abstract class PerPartitionConfig extends Serializable
  • class ClusteringSummary(JavaWrapper):
  • class GaussianMixtureSummary(ClusteringSummary):
  • class BisectingKMeansSummary(ClusteringSummary):
  • trait CollectionGenerator extends Generator
  • case class Stack(children: Seq[Expression]) extends Generator
  • abstract class ExplodeBase extends UnaryExpression with CollectionGenerator with Serializable
  • case class Explode(child: Expression) extends ExplodeBase
  • case class PosExplode(child: Expression) extends ExplodeBase
  • case class Inline(child: Expression) extends UnaryExpression with CollectionGenerator
  • trait InvokeLike extends Expression with NonSQLExpression
  • case class EventTimeWatermark(
  • class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
  • final class ParquetLogRedirector implements Serializable
  • sealed trait ViewType
  • case class OutputSpec(
  • class MaxLong(protected var currentValue: Long = 0)
  • case class EventTimeWatermarkExec(
  • class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging
  • case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None)
  • sealed trait StoreUpdate
  • case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate
  • case class SparkListenerDriverAccumUpdates(

@kiszk
Copy link
Member

kiszk commented Nov 27, 2016

Sorry for being late while I was travelling.
Can we introduce a configurable threshold parameter to avoid an application of this elimination to a very simple expression (e.g. Literal(1))?

@cloud-fan
Copy link
Contributor

Sorry for the delay, but I may not have time to review it before the 2.1 release, can you hold it off until 2.1 release? thanks!

@viirya
Copy link
Member Author

viirya commented Nov 28, 2016

@cloud-fan Sure, no problem.

@viirya
Copy link
Member Author

viirya commented Nov 28, 2016

@kiszk Do you mean to avoid subexpression elimination?

@kiszk
Copy link
Member

kiszk commented Nov 29, 2016

IMHO, it would be good to stop applying subexpression elimination to a very simple expression (e.g. Literal(1))). This is because it takes some cost (e.g. executing a conditional branch) for this subexpression elimination at runtime.

@viirya
Copy link
Member Author

viirya commented Jan 12, 2017

@kiszk I am hesitant to set a threshold for that. One reason is it will increase the complexity of subexpression elimination implementation and the benefit can be low or uncertain. Another reason is how we set up a good threshold.

@viirya
Copy link
Member Author

viirya commented Jan 12, 2017

ping @cloud-fan Since 2.1 is released now, can you have time to review this? Thanks.

@SparkQA
Copy link

SparkQA commented Jan 12, 2017

Test build #71240 has finished for PR 15837 at commit 9197cd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I'm worried about performance, now we have an extra function call and a if branch to retrieve the result of common subexpressions. Maybe we should just disable subexpression elimination for branch expressions like if and case when.

asfgit pushed a commit that referenced this pull request Jan 23, 2017
…tional expressions

## What changes were proposed in this pull request?

As I pointed out in #15807 (comment) , the current subexpression elimination framework has a problem, it always evaluates all common subexpressions at the beginning, even they are inside conditional expressions and may not be accessed.

Ideally we should implement it like scala lazy val, so we only evaluate it when it gets accessed at lease once. #15837 tries this approach, but it seems too complicated and may introduce performance regression.

This PR simply stops common subexpression elimination for conditional expressions, with some cleanup.

## How was this patch tested?

regression test

Author: Wenchen Fan <[email protected]>

Closes #16659 from cloud-fan/codegen.
@viirya
Copy link
Member Author

viirya commented Jan 24, 2017

Close this as alternative one #16659 is merged.

@viirya viirya closed this Jan 24, 2017
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…tional expressions

## What changes were proposed in this pull request?

As I pointed out in apache#15807 (comment) , the current subexpression elimination framework has a problem, it always evaluates all common subexpressions at the beginning, even they are inside conditional expressions and may not be accessed.

Ideally we should implement it like scala lazy val, so we only evaluate it when it gets accessed at lease once. apache#15837 tries this approach, but it seems too complicated and may introduce performance regression.

This PR simply stops common subexpression elimination for conditional expressions, with some cleanup.

## How was this patch tested?

regression test

Author: Wenchen Fan <[email protected]>

Closes apache#16659 from cloud-fan/codegen.
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…tional expressions

## What changes were proposed in this pull request?

As I pointed out in apache#15807 (comment) , the current subexpression elimination framework has a problem, it always evaluates all common subexpressions at the beginning, even they are inside conditional expressions and may not be accessed.

Ideally we should implement it like scala lazy val, so we only evaluate it when it gets accessed at lease once. apache#15837 tries this approach, but it seems too complicated and may introduce performance regression.

This PR simply stops common subexpression elimination for conditional expressions, with some cleanup.

## How was this patch tested?

regression test

Author: Wenchen Fan <[email protected]>

Closes apache#16659 from cloud-fan/codegen.
@viirya viirya deleted the improve-subexpr-elimination branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants