Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Mar 25, 2016

What changes were proposed in this pull request?

This PR generates Java code to get a float/double value of each column from CachedBatch when DataFrame.cache() is called. This is done in whole stage code generation.

When DataFrame.cache() is called, data is stored as column-oriented storage (columnar cache) in CachedBatch. This PR avoid conversion from column-oriented storage to row-oriented storage.

This PR handles only float and double that are stored in a column without compression. Another PR will handle other primitive types that may be stored in a column in a compressed format. This is for ease of review by reducing the size of PR

This PR consists of two parts.

  1. Pass data in CachedBatch to generated code by using decompress() method. CachedBatch consists of multiple ByteBuffer arrays. A ByteBuffer is just passed to generated code.
  2. Generate code both for row-oriented storage and column-oriented storage only if
    • InMemoryColumnarTableScan exists in a plan sub-tree. A decision is performed by checking an given iterator is ColumnaIterator at runtime
    • Sort or join does not exist in a plan sub-tree.

This PR generates Java code for columnar cache only if types in all columns, which are accessed in operations, are primitive

This PR improves performance of aggregate sum by 3.8x - 5.2x. This benchmark is available at here

Performance results:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 2.6.32-504.el6.x86_64
Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz

Running benchmark: Double Sum with PassThrough cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Double Sum with PassThrough cache:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       334 /  342         47.1          21.2       1.0X
ColumnVector codegen                       88 /   99        179.1           5.6       3.8X

Running benchmark: Float Sum with PassThrough cache
  Running case: InternalRow codegen
  Running case: ColumnVector codegen

Float Sum with PassThrough cache:   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
InternalRow codegen                       473 /  485         66.5          15.0       1.0X
ColumnVector codegen                       91 /   95        345.6           2.9       5.2X

Motivating example:

val df = sc.parallelize(0 to 9, 1).map(i => i.toFloat).toDF()
df.cache().filter("value <= 5.1").show()

Generated code

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * Filter (value#1 <= 5.1)
/* 007 */ +- INPUT
/* 008 */ */
/* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 010 */   private Object[] references;
/* 011 */   private scala.collection.Iterator inputadapter_input;
/* 012 */   private org.apache.spark.sql.execution.metric.LongSQLMetric filter_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue;
/* 014 */   private UnsafeRow filter_result;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 017 */   private scala.collection.Iterator inputadapter_input1;
/* 018 */   private int columnar_batchIdx;
/* 019 */   private int columnar_numRows;
/* 020 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inputadapter_col0;
/* 021 */   private UnsafeRow inputadapter_result;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inputadapter_holder;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inputadapter_rowWriter;
/* 024 */   private org.apache.spark.sql.execution.metric.LongSQLMetric filter_numOutputRows1;
/* 025 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue filter_metricValue1;
/* 026 */   private UnsafeRow filter_result1;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder1;
/* 028 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter1;
/* 029 */   private org.apache.spark.sql.execution.columnar.ColumnarIterator columnar_itr;
/* 030 */
/* 031 */   public GeneratedIterator(Object[] references) {
/* 032 */     this.references = references;
/* 033 */   }
/* 034 */
/* 035 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 036 */     partitionIndex = index;
/* 037 */     inputadapter_input = inputs[0];
/* 038 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[0];
/* 039 */     filter_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) filter_numOutputRows.localValue();
/* 040 */     filter_result = new UnsafeRow(1);
/* 041 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
/* 042 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 1);
/* 043 */     inputadapter_input1 = inputs[0];
/* 044 */     columnar_batchIdx = 0;
/* 045 */     columnar_numRows = 0;
/* 046 */     inputadapter_col0 = null;
/* 047 */     inputadapter_result = new UnsafeRow(1);
/* 048 */     this.inputadapter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inputadapter_result, 0);
/* 049 */     this.inputadapter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inputadapter_holder, 1);
/* 050 */     this.filter_numOutputRows1 = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
/* 051 */     filter_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) filter_numOutputRows1.localValue();
/* 052 */     filter_result1 = new UnsafeRow(1);
/* 053 */     this.filter_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result1, 0);
/* 054 */     this.filter_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder1, 1);
/* 055 */     columnar_itr = null;
/* 056 */   }
/* 057 */
/* 058 */   private void processBatch() throws java.io.IOException {
/* 059 */     /*** PRODUCE: Filter (value#1 <= 5.1) */
/* 060 */
/* 061 */     /*** PRODUCE: INPUT */
/* 062 */
/* 063 */     while (true) {
/* 064 */       if (columnar_batchIdx == 0) {
/* 065 */         columnar_numRows = columnar_itr.initForColumnar();
/* 066 */         if (columnar_numRows < 0) {
/* 067 */           cleanup();
/* 068 */           break;
/* 069 */         }
/* 070 */         inputadapter_col0 = columnar_itr.getColumn(0);
/* 071 */       }
/* 072 */
/* 073 */       while (columnar_batchIdx < columnar_numRows) {
/* 074 */         int inputadapter_rowIdx = columnar_batchIdx++;
/* 075 */         /*** CONSUME: Filter (value#1 <= 5.1) */
/* 076 */
/* 077 */         /* columnVector[inputadapter_col0, inputadapter_rowIdx, float] */
/* 078 */         float inputadapter_value1 = inputadapter_col0.getFloat(inputadapter_rowIdx);
/* 079 */
/* 080 */         /* (input[0, float] <= 5.1) */
/* 081 */         boolean filter_value4 = false;
/* 082 */         filter_value4 = inputadapter_value1 <= 5.1;
/* 083 */         if (!filter_value4) continue;
/* 084 */
/* 085 */         filter_metricValue1.add(1);
/* 086 */
/* 087 */         /*** CONSUME: WholeStageCodegen */
/* 088 */
/* 089 */         filter_rowWriter1.write(0, inputadapter_value1);
/* 090 */         append(filter_result1);
/* 091 */         if (shouldStop()) return;
/* 092 */       }
/* 093 */       columnar_batchIdx = 0;
/* 094 */     }
/* 095 */   }
/* 096 */
/* 097 */   private void processRow() throws java.io.IOException {
/* 098 */     /*** PRODUCE: Filter (value#1 <= 5.1) */
/* 099 */
/* 100 */     /*** PRODUCE: INPUT */
/* 101 */
/* 102 */     while (inputadapter_input.hasNext()) {
/* 103 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 104 */       /*** CONSUME: Filter (value#1 <= 5.1) */
/* 105 */
/* 106 */       /* input[0, float] */
/* 107 */       float inputadapter_value = inputadapter_row.getFloat(0);
/* 108 */
/* 109 */       /* (input[0, float] <= 5.1) */
/* 110 */       boolean filter_value = false;
/* 111 */       filter_value = inputadapter_value <= 5.1;
/* 112 */       if (!filter_value) continue;
/* 112 */       if (!filter_value) continue;
/* 113 */
/* 114 */       filter_metricValue.add(1);
/* 115 */
/* 116 */       /*** CONSUME: WholeStageCodegen */
/* 117 */
/* 118 */       filter_rowWriter.write(0, inputadapter_value);
/* 119 */       append(filter_result);
/* 120 */       if (shouldStop()) return;
/* 121 */     }
/* 122 */   }
/* 123 */
/* 124 */   private void cleanup() {
/* 125 */     inputadapter_col0 = null;
/* 126 */
/* 127 */     columnar_itr = null;
/* 128 */   }
/* 129 */
/* 130 */   protected void processNext() throws java.io.IOException {
/* 131 */     if ((columnar_batchIdx != 0) ||
/* 132 */       (inputadapter_input1 instanceof org.apache.spark.sql.execution.columnar.ColumnarIterator &&
/* 133 */         (columnar_itr = (org.apache.spark.sql.execution.columnar.ColumnarIterator)inputadapter_input1).isSupportColumnarCodeGen())) {
/* 134 */       processBatch();
/* 135 */     } else {
/* 136 */       processRow();
/* 137 */     }
/* 138 */   }
/* 139 */ }

How was this patch tested?

Tested existing test suites
added test suites for operations to dataframe generated by df.cache().

@SparkQA
Copy link

SparkQA commented Mar 25, 2016

Test build #54165 has finished for PR 11956 at commit f61c685.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 25, 2016

Test build #54168 has finished for PR 11956 at commit a43f4a2.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 25, 2016

Test build #54178 has finished for PR 11956 at commit c26cead.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54583 has finished for PR 11956 at commit eb6088b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54604 has finished for PR 11956 at commit 35a352a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 2, 2016

Test build #54756 has finished for PR 11956 at commit d81482d.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2016

Test build #55487 has finished for PR 11956 at commit 705fe6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55637 has finished for PR 11956 at commit ef47b8c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55639 has finished for PR 11956 at commit 19662ee.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2016

Test build #55643 has finished for PR 11956 at commit 96dbb79.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55725 has finished for PR 11956 at commit e7e34cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2016

Test build #55733 has finished for PR 11956 at commit 76155d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 14, 2016

Test build #55837 has finished for PR 11956 at commit b78a1a1.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 16, 2016

Test build #56016 has finished for PR 11956 at commit c0e6bc6.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Apr 17, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 17, 2016

Test build #56036 has finished for PR 11956 at commit c0e6bc6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk kiszk changed the title [SPARK-14098][SQL][WIP] Generate Java code that gets a value in each column of CachedBatch when DataFrame.cache() is called [SPARK-14098][SQL] Generate Java code that gets a value in each column of CachedBatch when DataFrame.cache() is called Apr 17, 2016
@SparkQA
Copy link

SparkQA commented Apr 17, 2016

Test build #56058 has finished for PR 11956 at commit fe2043c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2016

Test build #56093 has finished for PR 11956 at commit 6cfa2ff.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56203 has finished for PR 11956 at commit 23b6ae1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56236 has finished for PR 11956 at commit 5075cd2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56237 has finished for PR 11956 at commit c47a492.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DataFrameCacheSuite extends QueryTest with SharedSQLContext

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56255 has finished for PR 11956 at commit 8212e20.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DataFrameCacheSuite extends QueryTest with SharedSQLContext

@kiszk
Copy link
Member Author

kiszk commented Apr 20, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2016

Test build #61919 has finished for PR 11956 at commit 54aabef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62019 has finished for PR 11956 at commit fea9f34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DataFrameCacheBenchmark

@kiszk
Copy link
Member Author

kiszk commented Jul 27, 2016

Hi @davies could you please take a look at this since Spark 2.0.0 has been successfully released?

@kiszk
Copy link
Member Author

kiszk commented Jul 27, 2016

@rxin, could you please review this?

@kiszk
Copy link
Member Author

kiszk commented Aug 2, 2016

@davies, could you please take a look at this since Spark 2.0.0 has been successfully released?
cc: @rxin

@kiszk
Copy link
Member Author

kiszk commented Aug 8, 2016

@davies, I hope that you have some bandwidth to review PRs. Could you please review this, too?

@kiszk
Copy link
Member Author

kiszk commented Aug 15, 2016

@davies could you please review this?

@davies
Copy link
Contributor

davies commented Aug 15, 2016

@kiszk I'm sorry that I do not have the bandwidth to review this, https://github.com/apache/spark/pull/13899/files sounds like an easier approach (have not looked into the details), how do you think of these two?

@kiszk
Copy link
Member Author

kiszk commented Aug 15, 2016

@davies, thank you for your comment. I hope that you will have bandwidth soon since Spark 2.0 was released.
this PR does the same thing.

In particular, generated code for reading a column is almost the same. I listed up how these two approaches does three features.

this PR does the same thing.

  1. Generate code to read columnar storage for cache
  2. Generate code to create columnar storage for cache
  3. Use the new CachedBatchByte for columnar storage for cache
    my PR
  4. Generate code to read columnar storage for cache
  5. Use the existing static code to create columnar storage for cache
  6. Use use the existing CachedBatch that uses Array[Byte], which is wrapped by ByteBufferColumnVector, for columnar storage for cache

I like to simplify my PR by using the idea in the PR. For example, I can throw away new files ByteBufferColumnVector.java by implementing cache using ColumnarBatch and PassThroughSuite.scala by generating code.

I have the following questions:

  1. Do we generate code to create columnar storage for cache?
  2. Do we use the existing CachedBatch or ColumnarBatch for cache?
  3. In this implementation, how a cache content in ColumnarBatch will be serialized when it must be flushed into a disk?
  4. What test cases were failed in the PR? Links to test results are not valid now.
  5. Will we support compression scheme in the future if we use ColumnarBatch?

What do you think?

@kiszk
Copy link
Member Author

kiszk commented Aug 18, 2016

@davies We would appreciate it if you would make comments on my questions to share how we implement a part of columnar.

@kiszk
Copy link
Member Author

kiszk commented Aug 22, 2016

@davies Would it be possible to share your opinions regarding these design questions among us?

@a-roberts
Copy link
Contributor

@hvanhovell @marmbrus @srowen I see this PR has been open since the 25th of March and provides substantial performance improvements as mentioned above without introducing functional regressions, as leading SQL/community members what do you guys think?

@srowen
Copy link
Member

srowen commented Aug 22, 2016

I'm not qualified to comment as I tend to ignore SQL unless it's a simple and easy to evaluate change.

@kiszk
Copy link
Member Author

kiszk commented Aug 29, 2016

@davies Could you please share your great opinions regarding these design questions among our community while we know you are busy?

@davies
Copy link
Contributor

davies commented Aug 29, 2016

@kiszk The current implementation use ByteBuffer and smart compression algorithms, it too slow to build the in-memory cache, make it useless. So we'd like to improve the performance of building phase also. This PR is built for current representation, which may be throwed away in future, so I'd not rush to merge this PR, or spent to much time to review the details.

PR #13899 could be in the right direction, but need to double-check that by more benchmarks. In order to have better memory efficiency, we could use MEMORY_AND_SER storage level and compress the underlying array with LZ4 when serializing the ColumnVector.

@davies
Copy link
Contributor

davies commented Aug 29, 2016

cc @rxin

@kiszk
Copy link
Member Author

kiszk commented Aug 29, 2016

@davies Thank you for sharing your valuable thought. I understand the future direction. I will implement this direction by using this PR or another PR. While the future roadmap may consume more memory, the performance will be improved. The compression will be applied only when ColumnVector is serialized into disk. Am I correct?

What benchmark programs do you want to use to double-check this roadmap?

@davies
Copy link
Contributor

davies commented Aug 29, 2016

We could compress them in memory with MEMORY_AND_DISK_SER, this could be controlled by a flag.

@kiszk
Copy link
Member Author

kiszk commented Aug 30, 2016

Thank you for your clarification. Here is a good summary for me.
We may add property like spark.sql.cache.compression.codec to specify a compression method.

@kiszk
Copy link
Member Author

kiszk commented Aug 30, 2016

@davies, could you please rerun Jenkins for PR #13899? I would like to know which tests were failed by the current implementation.

@kiszk
Copy link
Member Author

kiszk commented Sep 2, 2016

I saw PR #13899. I understood there are two design points for now. Hopefully, no more points :)

  1. CachedBatch must be serializable.
  2. According to my experiments, we should make a class for a CachedBatch simple.

For 1, current ColumnarBatch is not serializable.
For 2, at this line, .next() returns CachedBatch, and then += involves call SizeEstimator.estimate(). If this method takes longer time for a complicated CacheBatch class, the performance is degraded. One of example is auto_join?? in this test.

@kiszk
Copy link
Member Author

kiszk commented Sep 21, 2016

Current PR #13899 does not support a case that an element is null.
For now, I am working for the followings:

  1. Support all of types (seems not easy to support UnsafeArrayData in ColumnVector efficiently)
  2. Make structures serializable
  3. Support null cases

@SparkQA
Copy link

SparkQA commented Jan 10, 2017

Test build #71153 has finished for PR 11956 at commit fea9f34.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class DataFrameCacheBenchmark

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73611 has finished for PR 11956 at commit fea9f34.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class DataFrameCacheBenchmark

@gatorsmile
Copy link
Member

@kiszk Is this still the issue? I knew you are working on the related part now.

@kiszk
Copy link
Member Author

kiszk commented Oct 28, 2017

Thank you for pointing it out. #18747 implemented this feature.

@kiszk kiszk closed this Oct 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants