Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Dec 10, 2017

What changes were proposed in this pull request?

This PR localizes lifetime of mutable states, which are used for isNull and value of aggregation results, in generated code by HashAggregateExec.

These status are passed to successor operations thru consume() method. It may violate this assumption at #19865 when operations that uses these variables are split. In the following example, agg_localBufValue and agg_localBufisNull are passed to an successor operation (projection). Lifetime of mutable states agg_bufValue and agg_bufIsNull are ended at Line 120.

This PR is based on @cloud-fan 's suggestion.

Without this PR

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator inputadapter_input;
...
/* 039 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 040 */     // initialize aggregation buffer
/* 041 */     final long agg_value = -1L;
/* 042 */     agg_bufIsNull = true;
/* 043 */     agg_bufValue = agg_value;
/* 044 */
/* 045 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 047 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */       long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
...
/* 100 */       } while (false);
/* 101 */       final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */       // update aggregation buffer
/* 103 */       agg_bufIsNull = agg_isNull3;
/* 104 */       agg_bufValue = agg_value3;
/* 105 */       if (shouldStop()) return;
/* 106 */     }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   protected void processNext() throws java.io.IOException {
/* 111 */     while (!agg_initAgg) {
/* 112 */       agg_initAgg = true;
/* 113 */       long agg_beforeAgg = System.nanoTime();
/* 114 */       agg_doAggregateWithoutKey();
/* 115 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 116 */
/* 117 */       // output the result
/* 118 */
/* 119 */       agg_numOutputRows.add(1);
/* 120 */       agg_rowWriter.zeroOutNullBytes();
/* 121 */
/* 122 */       if (agg_bufisNull) {
/* 123 */         agg_rowWriter.setNullAt(0);
/* 124 */       } else {
/* 125 */         agg_rowWriter.write(0, agg_bufValue);
/* 126 */       }
/* 127 */       append(agg_result);
/* 128 */     }
/* 129 */   }

With this PR

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator inputadapter_input;
...
/* 039 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 040 */     // initialize aggregation buffer
/* 041 */     final long agg_value = -1L;
/* 042 */     agg_bufIsNull = true;
/* 043 */     agg_bufValue = agg_value;
/* 044 */
/* 045 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 047 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */       long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
...
/* 100 */       } while (false);
/* 101 */       final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */       // update aggregation buffer
/* 103 */       agg_bufIsNull = agg_isNull3;
/* 104 */       agg_bufValue = agg_value3;
/* 105 */       if (shouldStop()) return;
/* 106 */     }
/* 107 */
/* 108 */   }
/* 109 */
/* 110 */   protected void processNext() throws java.io.IOException {
/* 111 */     while (!agg_initAgg) {
/* 112 */       agg_initAgg = true;
/* 113 */       long agg_beforeAgg = System.nanoTime();
/* 114 */       agg_doAggregateWithoutKey();
/* 115 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 116 */
/* 117 */       // output the result
/* 118 */
/* 119 */       boolean agg_localBufisNull = agg_bufIsNull;
/* 120 */       long agg_localBufValue = agg_bufValue;
/* 121 */
/* 122 */       agg_numOutputRows.add(1);
/* 123 */       agg_rowWriter.zeroOutNullBytes();
/* 124 */
/* 125 */       if (agg_localBufisNull) {
/* 126 */         agg_rowWriter.setNullAt(0);
/* 127 */       } else {
/* 128 */         agg_rowWriter.write(0, agg_localBufValue);
/* 129 */       }
/* 130 */       append(agg_result);
/* 131 */     }
/* 132 */   }

How was this patch tested?

Existing test suites

@SparkQA
Copy link

SparkQA commented Dec 10, 2017

Test build #84691 has finished for PR 19938 at commit ac65dd2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Dec 10, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Dec 10, 2017

Test build #84694 has finished for PR 19938 at commit ac65dd2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Dec 10, 2017

IIUC, the problem is only happened when we wrongly pass global variables into split functions and change the values. Will we change the result variables from aggregation? I think parent operators just use the variables in evaluating expressions, and won't change the values, doesn't it?

@kiszk
Copy link
Member Author

kiszk commented Dec 10, 2017

Yes, the problem would happen only when we pass global variables into split functions.
Can we guarantee 100% that any operation in consume() at here does not pass global variables into split functions?

@viirya
Copy link
Member

viirya commented Dec 10, 2017

They are possibly passed in split functions. But it is hard to image a case we will change their values in the functions. In SparkSQL, the output from a child operator are just used as input to evaluate new output in the parent operator. We don't use the output as mutable statuses.

The possible problematic case is when we create a global variable in an operator/expression codegen and use this global variable to carry mutable status (e.g., the condition meeting status in casewhen) during the evaluation of the op/expr. Then it is possibly we pass it and modify it in split functions.

If we don't change the values, seems to me this change just creates redundant local variables.

This doesn't cause any harm at all. So I feel no strong option for this.

@kiszk
Copy link
Member Author

kiszk commented Dec 11, 2017

Thank you for great thought. Let me think about it.

@cloud-fan
Copy link
Contributor

I have the same feeling with @viirya , expressions/operators usually won't mutate input from children. One concern is how to guarantee this programmatically, but might be OK as I don't think this would happen. We can also remove the temp global variables introduced by the recent commits from @kiszk

@cloud-fan
Copy link
Contributor

actually there is one real problem: after we fold many global variables into an array, the variable name may become something like arr[1], which can't be used as the parameter name.

Localize the global variables in current expression/operator is one solution, another one is generating parameter names instead of reusing the input variable name.

@viirya
Copy link
Member

viirya commented Dec 11, 2017

Good point.

@kiszk
Copy link
Member Author

kiszk commented Dec 11, 2017

This makes sense to me.

Currently, based on previous discussion, we are fixing code generation and insert an assertion at #19865 to ensure no global variables are passed.
Of course, as @cloud-fan said, generating a new parameter name for global variable.

Which one is better solution?

@maropu
Copy link
Member

maropu commented Dec 12, 2017

Is there only the place where we need this localization (I mean other operators don't need this logic)? I'm also neutral about this pr though, I feel better to make this more general to avoid the same situation in the other existing (and new) operators.

@cloud-fan
Copy link
Contributor

I prefer to generate new parameter name in splitExpression over localizing global variables. There is no contract that an Expression must output java variables, we may inline some values, e.g. we already output false or true literal for isNull, and I roughly remember we do the same thing for value, e.g. a + 1 instead of a new java variable c which is calculated by c = a + 1.

@kiszk
Copy link
Member Author

kiszk commented Dec 12, 2017

Sure, in #19865, I will generate new parameter name in splitExpression instead of inserting assertion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants