Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Mar 3, 2016

JIRA: https://issues.apache.org/jira/browse/SPARK-13636

What changes were proposed in this pull request?

As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows.

How was this patch tested?

All existing wholestage codegen tests should be passed.

@viirya
Copy link
Member Author

viirya commented Mar 3, 2016

Before this patch, the generated codes for the Sort operator in the plan val df = sqlContext.range(3, 0, -1).sort(col("id")):

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private UnsafeRow sort_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sort_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter sort_rowWriter;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 021 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 022 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 023 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 024 */
/* 025 */   public GeneratedIterator(Object[] references) {
/* 026 */     this.references = references;
/* 027 */   }
/* 028 */
/* 029 */   public void init(scala.collection.Iterator inputs[]) {
/* 030 */     sort_needToSort = true;
/* 031 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 032 */     sort_sorter = sort_plan.createSorter();
/* 033 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 034 */
/* 035 */     inputadapter_input = inputs[0];
/* 036 */     sort_result = new UnsafeRow(1);
/* 037 */     this.sort_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sort_result, 0)
;
/* 038 */     this.sort_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sort_hold
er, 1);
/* 039 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 040 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValu
e();
/* 041 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 042 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localVa
lue();
/* 043 */   }
/* 044 */
/* 045 */   private void sort_addToSorter() throws java.io.IOException {
/* 046 */     while (inputadapter_input.hasNext()) {
/* 047 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 048 */       /* input[0, bigint] */
/* 049 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 050 */       long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
/* 051 */       // Convert the input attributes to an UnsafeRow and add it to the sorter
/* 052 */
/* 053 */       sort_rowWriter.zeroOutNullBytes();
/* 054 */
/* 055 */       if (inputadapter_isNull) {
/* 056 */         sort_rowWriter.setNullAt(0);
/* 057 */       } else {
/* 058 */         sort_rowWriter.write(0, inputadapter_value);
/* 059 */       }
/* 060 */
/* 061 */       sort_sorter.insertRow(sort_result);
/* 062 */       if (shouldStop()) {
/* 063 */         return;
/* 064 */       }
/* 065 */     }
/* 066 */
/* 067 */   }
/* 068 */
/* 069 */   protected void processNext() throws java.io.IOException {
/* 070 */     if (sort_needToSort) {
/* 071 */       sort_addToSorter();
/* 072 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 073 */       sort_sortedIter = sort_sorter.sort();
/* 074 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 075 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 076 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 077 */       sort_needToSort = false;
/* 078 */     }
/* 079 */
/* 080 */     while (sort_sortedIter.hasNext()) {
/* 081 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 082 */       append(sort_outputRow.copy());
/* 083 */       if (shouldStop()) return;
/* 084 */     }
/* 085 */   }
/* 086 */ }

@viirya
Copy link
Member Author

viirya commented Mar 3, 2016

After this patch, the generated codes:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#22L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private boolean sort_needToSort;
/* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */   private scala.collection.Iterator inputadapter_input;
/* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 021 */
/* 022 */   public GeneratedIterator(Object[] references) {
/* 023 */     this.references = references;
/* 024 */   }
/* 025 */
/* 026 */   public void init(scala.collection.Iterator inputs[]) {
/* 027 */     sort_needToSort = true;
/* 028 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 029 */     sort_sorter = sort_plan.createSorter();
/* 030 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 031 */
/* 032 */     inputadapter_input = inputs[0];
/* 033 */     this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 034 */     sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValu
e();
/* 035 */     this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 036 */     sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localVa
lue();
/* 037 */   }
/* 038 */
/* 039 */   private void sort_addToSorter() throws java.io.IOException {
/* 040 */     while (inputadapter_input.hasNext()) {
/* 041 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */
/* 043 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row.copy());
/* 044 */       if (shouldStop()) {
/* 045 */         return;
/* 046 */       }
/* 047 */     }
/* 048 */
/* 049 */   }
/* 050 */
/* 051 */   protected void processNext() throws java.io.IOException {
/* 052 */     if (sort_needToSort) {
/* 053 */       sort_addToSorter();
/* 054 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 055 */       sort_sortedIter = sort_sorter.sort();
/* 056 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 057 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 058 */       sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 059 */       sort_needToSort = false;
/* 060 */     }
/* 061 */
/* 062 */     while (sort_sortedIter.hasNext()) {
/* 063 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 064 */       append(sort_outputRow.copy());
/* 065 */       if (shouldStop()) return;
/* 066 */     }
/* 067 */   }
/* 068 */ }

@viirya
Copy link
Member Author

viirya commented Mar 3, 2016

You can find that in the method sort_addToSorter, the codes generated by this patch doesn't need to unpack fields from UnsafeRows and re-pack them back to another UnsafeRow in order to insert into the row sorter.

@viirya viirya changed the title [SPARK-13636][SQL] Direct consume UnsafeRow in wholestage codegen plans [SPARK-13636][SQL] Directly consume UnsafeRow in wholestage codegen plans Mar 3, 2016
@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52367 has finished for PR 11484 at commit 6941eb1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 3, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52377 has finished for PR 11484 at commit 6941eb1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 4, 2016

cc @davies @rxin @nongli

protected var parent: CodegenSupport = null

/**
* Whether this SparkPlan accepts UnsafeRow as input in consumeChild.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumeChild -> doConsume

@davies
Copy link
Contributor

davies commented Mar 4, 2016

@viirya Can we wait for #11274 , then we could avoid some complicity.

@viirya
Copy link
Member Author

viirya commented Mar 4, 2016

@davies Yea. That will be good.

@kiszk
Copy link
Member

kiszk commented Mar 4, 2016

Is it better to add "in sort" in a title of this PR?

@viirya
Copy link
Member Author

viirya commented Mar 4, 2016

@kiszk this is not just for Sort operator. I just take Sort operator as an example.

@kiszk
Copy link
Member

kiszk commented Mar 4, 2016

@viirya thank you for your explanation. I understood that this PR supports sort and operations regarding whole stage code generation

viirya added 4 commits March 9, 2016 14:37
…saferow

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@SparkQA
Copy link

SparkQA commented Mar 9, 2016

Test build #52733 has finished for PR 11484 at commit dea644a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 9, 2016

Test build #52732 has finished for PR 11484 at commit 6400eb2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 9, 2016

@davies I've addressed your comments. I also made corresponding changes for #11274. Please see if this change is good now. Thanks!

}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
if (row != null) {
s"$sorterVariable.insertRow((UnsafeRow)$row.copy());"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the copy here? I think the sorter will copy it by itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I checked it. Will remove this copy call.

@SparkQA
Copy link

SparkQA commented Mar 10, 2016

Test build #52810 has finished for PR 11484 at commit 6f0ae35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 10, 2016

@davies Comments are addressed. Please let me know if you have further comments. Thanks!

@davies
Copy link
Contributor

davies commented Mar 10, 2016

LGTM, merging into master, thanks!

@asfgit asfgit closed this in d24801a Mar 10, 2016
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…plans

JIRA: https://issues.apache.org/jira/browse/SPARK-13636

## What changes were proposed in this pull request?

As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows.

## How was this patch tested?

All existing wholestage codegen tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#11484 from viirya/direct-consume-unsaferow.
@viirya viirya deleted the direct-consume-unsaferow branch December 27, 2023 18:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants