Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented May 23, 2017

What changes were proposed in this pull request?

This PR generates the following Java code

  1. Build each in-memory table cache using ColumnarBatch with ColumnVector instead of using CachedBatch with Array[Byte].
  2. Get a value for a column in `ColumnVector without using an iterator

As the first step, for ease of review, I supported only integer and double data types with whole-stage codegen. Another PR will address an execution path without whole-stage codegen

This PR implements the follings:

  1. Keep a in-memory table cache using ColumnarBatch with ColumnVector. For supporting the new and coventional cache data structure, this PR declares CachedBatch as trait, and declares CachedColumnarBatch and CachedBatchBytes as actual implementations.
  2. Generate Java code to build a in-memory table cache.
  3. Generate Java code to directly get value from ColumnVector.

This PR improves runtime performance by

  1. build in-memory table cache by eliminating lots of virtual calls and complicated data path.
  2. eliminating data copy from column-oriented storage to InternalRow in a SpecificColumnarIterator iterator.

Options
A ColumnVector for all primitive data types in ColumnarBatch can be compressed. Currently, there are two ways to enable compression:

  1. Set true into a property spark.sql.inMemoryColumnarStorage.compressed (default is true), or
  2. Call DataFrame.persist(st), where st is MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, MEMORY_AND_DISK_SER, or MEMORY_AND_DISK_SER_2.

an example program

val df = sparkContext.parallelize((1 to 10), 1).map(i => (i, i.toDouble)).toDF("i", "d").cache
df.filter("i < 8 and 4.0 < d").show

Generated code for building a in-memory table cache

/* 001 */ import scala.collection.Iterator;
/* 002 */ import org.apache.spark.sql.types.DataType;
/* 003 */ import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
/* 004 */ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
/* 005 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
/* 006 */ import org.apache.spark.sql.execution.vectorized.ColumnVector;
/* 007 */
/* 008 */ public SpecificColumnarIterator generate(Object[] references) {
/* 009 */   return new SpecificColumnarIterator(references);
/* 010 */ }
/* 011 */
/* 012 */ class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
/* 013 */   private ColumnVector[] colInstances;
/* 014 */   private UnsafeRow unsafeRow = new UnsafeRow(0);
/* 015 */   private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
/* 016 */   private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 0);
/* 017 */   private MutableUnsafeRow mutableRow = null;
/* 018 */
/* 019 */   private int rowIdx = 0;
/* 020 */   private int numRowsInBatch = 0;
/* 021 */
/* 022 */   private scala.collection.Iterator input = null;
/* 023 */   private DataType[] columnTypes = null;
/* 024 */   private int[] columnIndexes = null;
/* 025 */
/* 026 */
/* 027 */
/* 028 */   public SpecificColumnarIterator(Object[] references) {
/* 029 */
/* 030 */     this.mutableRow = new MutableUnsafeRow(rowWriter);
/* 031 */   }
/* 032 */
/* 033 */   public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
/* 034 */     this.input = input;
/* 035 */     this.columnTypes = columnTypes;
/* 036 */     this.columnIndexes = columnIndexes;
/* 037 */   }
/* 038 */
/* 039 */
/* 040 */
/* 041 */   public boolean hasNext() {
/* 042 */     if (rowIdx < numRowsInBatch) {
/* 043 */       return true;
/* 044 */     }
/* 045 */     if (!input.hasNext()) {
/* 046 */       return false;
/* 047 */     }
/* 048 */
/* 049 */     org.apache.spark.sql.execution.columnar.CachedColumnarBatch cachedBatch =
/* 050 */     (org.apache.spark.sql.execution.columnar.CachedColumnarBatch) input.next();
/* 051 */     org.apache.spark.sql.execution.vectorized.ColumnarBatch batch = cachedBatch.columnarBatch();
/* 052 */     rowIdx = 0;
/* 053 */     numRowsInBatch = cachedBatch.getNumRows();
/* 054 */     colInstances = new ColumnVector[columnIndexes.length];
/* 055 */     for (int i = 0; i < columnIndexes.length; i ++) {
/* 056 */       colInstances[i] = batch.column(columnIndexes[i]);
/* 057 */     }
/* 058 */
/* 059 */     return hasNext();
/* 060 */   }
/* 061 */
/* 062 */   public InternalRow next() {
/* 063 */     bufferHolder.reset();
/* 064 */     rowWriter.zeroOutNullBytes();
/* 065 */
/* 066 */     unsafeRow.setTotalSize(bufferHolder.totalSize());
/* 067 */     rowIdx += 1;
/* 068 */     return unsafeRow;
/* 069 */   }
/* 070 */ }

Generated code by whole-stage codegen (lines 75-78 are major changes)

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator inmemorytablescan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_scanTime;
/* 014 */   private long inmemorytablescan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch inmemorytablescan_batch;
/* 016 */   private int inmemorytablescan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance0;
/* 018 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance1;
/* 019 */   private UnsafeRow inmemorytablescan_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inmemorytablescan_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inmemorytablescan_rowWriter;
/* 022 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 023 */   private UnsafeRow filter_result;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 026 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 027 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 028 */   private UnsafeRow agg_result;
/* 029 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 030 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 031 */
/* 032 */   public GeneratedIterator(Object[] references) {
/* 033 */     this.references = references;
/* 034 */   }
/* 035 */
/* 036 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 037 */     partitionIndex = index;
/* 038 */     this.inputs = inputs;
/* 039 */     wholestagecodegen_init_0();
/* 040 */     wholestagecodegen_init_1();
/* 041 */
/* 042 */   }
/* 043 */
/* 044 */   private void wholestagecodegen_init_0() {
/* 045 */     agg_initAgg = false;
/* 046 */
/* 047 */     inmemorytablescan_input = inputs[0];
/* 048 */     this.inmemorytablescan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 049 */     this.inmemorytablescan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 050 */     inmemorytablescan_scanTime1 = 0;
/* 051 */     inmemorytablescan_batch = null;
/* 052 */     inmemorytablescan_batchIdx = 0;
/* 053 */     inmemorytablescan_colInstance0 = null;
/* 054 */     inmemorytablescan_colInstance1 = null;
/* 055 */     inmemorytablescan_result = new UnsafeRow(2);
/* 056 */     this.inmemorytablescan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inmemorytablescan_result, 0);
/* 057 */     this.inmemorytablescan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inmemorytablescan_holder, 2);
/* 058 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 059 */     filter_result = new UnsafeRow(2);
/* 060 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
/* 061 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
/* 062 */
/* 063 */   }
/* 064 */
/* 065 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 066 */     // initialize aggregation buffer
/* 067 */     agg_bufIsNull = false;
/* 068 */     agg_bufValue = 0L;
/* 069 */
/* 070 */     if (inmemorytablescan_batch == null) {
/* 071 */       inmemorytablescan_nextBatch();
/* 072 */     }
/* 073 */     while (inmemorytablescan_batch != null) {
/* 074 */       int inmemorytablescan_numRows = inmemorytablescan_batch.numRows();
/* 075 */       int inmemorytablescan_localEnd = inmemorytablescan_numRows - inmemorytablescan_batchIdx;
/* 076 */       for (int inmemorytablescan_localIdx = 0; inmemorytablescan_localIdx < inmemorytablescan_localEnd; inmemorytablescan_localIdx++) {
/* 077 */         int inmemorytablescan_rowIdx = inmemorytablescan_batchIdx + inmemorytablescan_localIdx;
/* 078 */         int inmemorytablescan_value = inmemorytablescan_colInstance0.getInt(inmemorytablescan_rowIdx);
/* 079 */
/* 080 */         boolean filter_isNull = false;
/* 081 */
/* 082 */         boolean filter_value = false;
/* 083 */         filter_value = inmemorytablescan_value > 4;
/* 084 */         if (!filter_value) continue;
/* 085 */         double inmemorytablescan_value1 = inmemorytablescan_colInstance1.getDouble(inmemorytablescan_rowIdx);
/* 086 */
/* 087 */         boolean filter_isNull3 = false;
/* 088 */
/* 089 */         boolean filter_value3 = false;
/* 090 */         filter_value3 = org.apache.spark.util.Utils.nanSafeCompareDoubles(inmemorytablescan_value1, 10.0D) > 0;
/* 091 */         if (!filter_value3) continue;
/* 092 */
/* 093 */         filter_numOutputRows.add(1);
/* 094 */
/* 095 */         // do aggregate
/* 096 */         // common sub-expressions
/* 097 */
/* 098 */         // evaluate aggregate function
/* 099 */         boolean agg_isNull1 = false;
/* 100 */
/* 101 */         long agg_value1 = -1L;
/* 102 */         agg_value1 = agg_bufValue + 1L;
/* 103 */         // update aggregation buffer
/* 104 */         agg_bufIsNull = false;
/* 105 */         agg_bufValue = agg_value1;
/* 106 */         // shouldStop check is eliminated
/* 107 */       }
/* 108 */       inmemorytablescan_batchIdx = inmemorytablescan_numRows;
/* 109 */       inmemorytablescan_batch = null;
/* 110 */       inmemorytablescan_nextBatch();
/* 111 */     }
/* 112 */     inmemorytablescan_scanTime.add(inmemorytablescan_scanTime1 / (1000 * 1000));
/* 113 */     inmemorytablescan_scanTime1 = 0;
/* 114 */
/* 115 */   }
/* 116 */
/* 117 */   private void inmemorytablescan_nextBatch() throws java.io.IOException {
/* 118 */     long getBatchStart = System.nanoTime();
/* 119 */     if (inmemorytablescan_input.hasNext()) {
/* 120 */       inmemorytablescan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)inmemorytablescan_input.next();
/* 121 */       inmemorytablescan_numOutputRows.add(inmemorytablescan_batch.numRows());
/* 122 */       inmemorytablescan_batchIdx = 0;
/* 123 */       inmemorytablescan_colInstance0 = inmemorytablescan_batch.column(0);
/* 124 */       inmemorytablescan_colInstance1 = inmemorytablescan_batch.column(1);
/* 125 */
/* 126 */     }
/* 127 */     inmemorytablescan_scanTime1 += System.nanoTime() - getBatchStart;
/* 128 */   }
/* 129 */
/* 130 */   private void wholestagecodegen_init_1() {
/* 131 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 132 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[4];
/* 133 */     agg_result = new UnsafeRow(1);
/* 134 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 135 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 136 */
/* 137 */   }
/* 138 */
/* 139 */   protected void processNext() throws java.io.IOException {
/* 140 */     while (!agg_initAgg) {
/* 141 */       agg_initAgg = true;
/* 142 */       long agg_beforeAgg = System.nanoTime();
/* 143 */       agg_doAggregateWithoutKey();
/* 144 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 145 */
/* 146 */       // output the result
/* 147 */
/* 148 */       agg_numOutputRows.add(1);
/* 149 */       agg_rowWriter.zeroOutNullBytes();
/* 150 */
/* 151 */       if (agg_bufIsNull) {
/* 152 */         agg_rowWriter.setNullAt(0);
/* 153 */       } else {
/* 154 */         agg_rowWriter.write(0, agg_bufValue);
/* 155 */       }
/* 156 */       append(agg_result);
/* 157 */     }
/* 158 */   }
/* 159 */ }

How was this patch tested?

Add test suites for wider columns

@SparkQA
Copy link

SparkQA commented May 23, 2017

Test build #77222 has finished for PR 18066 at commit 6ed3d3f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class GenerateColumnAccessor(useColumnarBatch: Boolean)
  • class GenerateColumnarBatch(
  • class GeneratedColumnarBatchIterator extends $

@SparkQA
Copy link

SparkQA commented May 23, 2017

Test build #77235 has finished for PR 18066 at commit 513acfb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented May 23, 2017

@hvanhovell @sameeragarwal Would you please review this?

@kiszk
Copy link
Member Author

kiszk commented May 29, 2017

ping @hvanhovell

@kiszk
Copy link
Member Author

kiszk commented Jun 3, 2017

ping @hvanhovell @sameeragarwal

@SparkQA
Copy link

SparkQA commented Jun 3, 2017

Test build #77702 has finished for PR 18066 at commit c183032.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

storageLevel == MEMORY_AND_DISK_SER || storageLevel == MEMORY_AND_DISK_SER_2)
}

private val typeToName = Map[AbstractDataType, String](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @kiszk .
Is there any reason having only two types, int and double?
The PR looks more general to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I described in the description, this is for ease of review.

As the first step, for ease of review, I supported only integer and double data types with whole-stage codegen. Another PR will address an execution path without whole-stage codegen


private[columnar] val useColumnarBatches: Boolean = {
// In the initial implementation, for ease of review
// support only integer and double and # of fields is less than wholeStageMaxNumFields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Here is the comment about the reason.

@kiszk
Copy link
Member Author

kiszk commented Jun 8, 2017

ping @hvanhovell @sameeragarwal

@kiszk
Copy link
Member Author

kiszk commented Jul 27, 2017

#18747 is another PR for this JIRA entry

@kiszk kiszk closed this Jul 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants