-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13123][SQL] Implement whole state codegen for sort #11359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This does the simplest thing of just assembly a row on consume and driving the underlying external sorter object.
|
Test build #51922 has finished for PR 11359 at commit
|
|
@sameeragarwal you should update the pr description to actually include what this patch does (in addition to that it was built on an earlier pr). For code gen prs, would be great to paste in the generated code. |
|
Thanks @rxin, added! |
|
Test build #51953 has finished for PR 11359 at commit
|
|
Maybe paste the generated code in the comment section so it doesn't get merged as part of the commit. Otherwise the commit description is super long. Thanks. |
|
Generated code: /* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Sort [id#0L ASC], true, 0
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */ private Object[] references;
/* 011 */ private boolean sort_needToSort;
/* 012 */ private org.apache.spark.sql.execution.Sort sort_plan;
/* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
/* 016 */ private scala.collection.Iterator inputadapter_input;
/* 017 */ private UnsafeRow sort_result;
/* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder sort_holder;
/* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter sort_rowWriter;
/* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize;
/* 021 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue;
/* 022 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize;
/* 023 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1;
/* 024 */
/* 025 */ public GeneratedIterator(Object[] references) {
/* 026 */ this.references = references;
/* 027 */ }
/* 028 */
/* 029 */ public void init(scala.collection.Iterator inputs[]) {
/* 030 */ sort_needToSort = true;
/* 031 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0];
/* 032 */ sort_sorter = sort_plan.createSorter();
/* 033 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 034 */
/* 035 */ inputadapter_input = inputs[0];
/* 036 */ sort_result = new UnsafeRow(1);
/* 037 */ this.sort_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(sort_result, 0);
/* 038 */ this.sort_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(sort_holder, 1);
/* 039 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 040 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue();
/* 041 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
/* 042 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue();
/* 043 */ }
/* 044 */
/* 045 */ private void sort_addToSorter() throws java.io.IOException {
/* 046 */ while (inputadapter_input.hasNext()) {
/* 047 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 048 */ /* input[0, bigint] */
/* 049 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 050 */ long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
/* 051 */ // Convert the input attributes to an UnsafeRow and add it to the sorter
/* 052 */
/* 053 */ sort_rowWriter.zeroOutNullBytes();
/* 054 */
/* 055 */ if (inputadapter_isNull) {
/* 056 */ sort_rowWriter.setNullAt(0);
/* 057 */ } else {
/* 058 */ sort_rowWriter.write(0, inputadapter_value);
/* 059 */ }
/* 060 */
/* 061 */ sort_sorter.insertRow(sort_result);
/* 062 */ if (shouldStop()) {
/* 063 */ return;
/* 064 */ }
/* 065 */ }
/* 066 */
/* 067 */ }
/* 068 */
/* 069 */ protected void processNext() throws java.io.IOException {
/* 070 */ if (sort_needToSort) {
/* 071 */ sort_addToSorter();
/* 072 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 073 */ sort_sortedIter = sort_sorter.sort();
/* 074 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
/* 075 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 076 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 077 */ sort_needToSort = false;
/* 078 */ }
/* 079 */
/* 080 */ while (sort_sortedIter.hasNext()) {
/* 081 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 082 */ append(sort_outputRow.copy());
/* 083 */ if (shouldStop()) return;
/* 084 */ }
/* 085 */ }
/* 086 */ } |
|
Test build #51978 has finished for PR 11359 at commit
|
|
test this please |
|
Test build #51982 has finished for PR 11359 at commit
|
|
Test build #52109 has finished for PR 11359 at commit
|
|
@nongli this should be ready for your pass. |
| val outputRow = ctx.freshName("outputRow") | ||
| val dataSize = metricTerm(ctx, "dataSize") | ||
| val spillSize = metricTerm(ctx, "spillSize") | ||
| val spillSizeBefore = ctx.freshName("spillSizeBefore") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just be a local var. Just remove the ".addMutableState" below and fix line 141.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
|
LGTM |
|
Test build #52192 has finished for PR 11359 at commit
|
|
OK I am merging it. |
|
|
||
| s""" | ||
| | // Convert the input attributes to an UnsafeRow and add it to the sorter | ||
| | ${code.code} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may have performance regression, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables.
See #11008 (comment)
@yhuai Should we revert this patch or fix this by follow-up PR?
## What changes were proposed in this pull request? This PR adds support for implementing whole state codegen for sort. Builds heaving on nongli 's PR: apache#11008 (which actually implements the feature), and adds the following changes on top: - [x] Generated code updates peak execution memory metrics - [x] Unit tests in `WholeStageCodegenSuite` and `SQLMetricsSuite` ## How was this patch tested? New unit tests in `WholeStageCodegenSuite` and `SQLMetricsSuite`. Further, all existing sort tests should pass. Author: Sameer Agarwal <[email protected]> Author: Nong Li <[email protected]> Closes apache#11359 from sameeragarwal/sort-codegen.
What changes were proposed in this pull request?
This PR adds support for implementing whole state codegen for sort. Builds heaving on @nongli 's PR: #11008 (which actually implements the feature), and adds the following changes on top:
WholeStageCodegenSuiteandSQLMetricsSuiteHow was this patch tested?
New unit tests in
WholeStageCodegenSuiteandSQLMetricsSuite. Further, all existing sort tests should pass.