Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jul 27, 2017

What changes were proposed in this pull request?

This PR generates the Java code to directly get a value for a column in ColumnVector without using an iterator (e.g. at lines 54-69 in the generated code example) for table cache (e.g. dataframe.cache). This PR improves runtime performance by eliminating data copy from column-oriented storage to InternalRow in a SpecificColumnarIterator iterator for primitive type. Another PR will support primitive type array.

Benchmark result: 2.9x

OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz

Int Sum with IntDelta cache:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
InternalRow codegen                           1708 / 1894         49.1          20.4       1.0X

Int Sum with IntDelta cache:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ColumnVector codegen                           590 /  693        142.2           7.0       1.0X

Benchmark program

  intSumBenchmark(sqlContext, 1024 * 1024 * 80)
  def intSumBenchmark(sqlContext: SQLContext, values: Int): Unit = {
    import sqlContext.implicits._
    val benchmarkPT = new Benchmark("Int Sum with IntDelta cache", values, 20)
    val dfPassThrough = sqlContext.sparkContext.parallelize(0 to values - 1, 1).toDF().cache()
    dfPassThrough.count()       // force to create df.cache()
    val str = "ColumnVector"
    benchmarkPT.addCase(s"$str codegen") { iter =>
      dfPassThrough.filter("value % 16 = 0").count
    }
    benchmarkPT.run()
    dfPassThrough.unpersist(true)
  }

Motivating example

val dsInt = spark.range(3).cache
dsInt.count // force to build cache
dsInt.filter(_ > 0).collect

Generated code

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inmemorytablescan_input;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_numOutputRows;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_scanTime;
/* 011 */   private long inmemorytablescan_scanTime1;
/* 012 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch inmemorytablescan_batch;
/* 013 */   private int inmemorytablescan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector inmemorytablescan_colInstance0;
/* 015 */   private UnsafeRow inmemorytablescan_result;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inmemorytablescan_holder;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inmemorytablescan_rowWriter;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 019 */   private UnsafeRow filter_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     inmemorytablescan_input = inputs[0];
/* 031 */     inmemorytablescan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 032 */     inmemorytablescan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 033 */     inmemorytablescan_scanTime1 = 0;
/* 034 */     inmemorytablescan_batch = null;
/* 035 */     inmemorytablescan_batchIdx = 0;
/* 036 */     inmemorytablescan_colInstance0 = null;
/* 037 */     inmemorytablescan_result = new UnsafeRow(1);
/* 038 */     inmemorytablescan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inmemorytablescan_result, 0);
/* 039 */     inmemorytablescan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inmemorytablescan_holder, 1);
/* 040 */     filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 041 */     filter_result = new UnsafeRow(1);
/* 042 */     filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
/* 043 */     filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 1);
/* 044 */
/* 045 */   }
/* 046 */
/* 047 */   protected void processNext() throws java.io.IOException {
/* 048 */     if (inmemorytablescan_batch == null) {
/* 049 */       inmemorytablescan_nextBatch();
/* 050 */     }
/* 051 */     while (inmemorytablescan_batch != null) {
/* 052 */       int inmemorytablescan_numRows = inmemorytablescan_batch.numRows();
/* 053 */       int inmemorytablescan_localEnd = inmemorytablescan_numRows - inmemorytablescan_batchIdx;
/* 054 */       for (int inmemorytablescan_localIdx = 0; inmemorytablescan_localIdx < inmemorytablescan_localEnd; inmemorytablescan_localIdx++) {
/* 055 */         int inmemorytablescan_rowIdx = inmemorytablescan_batchIdx + inmemorytablescan_localIdx;
/* 056 */         int inmemorytablescan_value = inmemorytablescan_colInstance0.getInt(inmemorytablescan_rowIdx);
/* 057 */
/* 058 */         boolean filter_isNull = false;
/* 059 */
/* 060 */         boolean filter_value = false;
/* 061 */         filter_value = inmemorytablescan_value > 1;
/* 062 */         if (!filter_value) continue;
/* 063 */
/* 064 */         filter_numOutputRows.add(1);
/* 065 */
/* 066 */         filter_rowWriter.write(0, inmemorytablescan_value);
/* 067 */         append(filter_result);
/* 068 */         if (shouldStop()) { inmemorytablescan_batchIdx = inmemorytablescan_rowIdx + 1; return; }
/* 069 */       }
/* 070 */       inmemorytablescan_batchIdx = inmemorytablescan_numRows;
/* 071 */       inmemorytablescan_batch = null;
/* 072 */       inmemorytablescan_nextBatch();
/* 073 */     }
/* 074 */     inmemorytablescan_scanTime.add(inmemorytablescan_scanTime1 / (1000 * 1000));
/* 075 */     inmemorytablescan_scanTime1 = 0;
/* 076 */   }
/* 077 */
/* 078 */   private void inmemorytablescan_nextBatch() throws java.io.IOException {
/* 079 */     long getBatchStart = System.nanoTime();
/* 080 */     if (inmemorytablescan_input.hasNext()) {
/* 081 */       org.apache.spark.sql.execution.columnar.CachedBatch inmemorytablescan_cachedBatch = (org.apache.spark.sql.execution.columnar.CachedBatch)inmemorytablescan_input.next();
/* 082 */       inmemorytablescan_batch = org.apache.spark.sql.execution.columnar.InMemoryRelation$.MODULE$.createColumn(inmemorytablescan_cachedBatch);
/* 083 */
/* 084 */       inmemorytablescan_numOutputRows.add(inmemorytablescan_batch.numRows());
/* 085 */       inmemorytablescan_batchIdx = 0;
/* 086 */       inmemorytablescan_colInstance0 = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) inmemorytablescan_batch.column(0); org.apache.spark.sql.execution.columnar.ColumnAccessor$.MODULE$.decompress(inmemorytablescan_cachedBatch.buffers()[0], (org.apache.spark.sql.execution.vectorized.WritableColumnVector) inmemorytablescan_colInstance0, org.apache.spark.sql.types.DataTypes.IntegerType, inmemorytablescan_cachedBatch.numRows());
/* 087 */
/* 088 */     }
/* 089 */     inmemorytablescan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */   }
/* 091 */ }

How was this patch tested?

Add test cases into DataFrameTungstenSuite and WholeStageCodegenSuite

@SparkQA
Copy link

SparkQA commented Jul 27, 2017

Test build #79990 has finished for PR 18747 at commit 1a15fa0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2017

Test build #79992 has finished for PR 18747 at commit d534d2c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 27, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jul 27, 2017

Test build #79993 has finished for PR 18747 at commit d534d2c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2017

Test build #80312 has finished for PR 18747 at commit d84b5dd.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 21, 2017

Test build #82043 has finished for PR 18747 at commit 3a6edc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2017

Test build #82121 has finished for PR 18747 at commit c77b129.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2017

Test build #82127 has finished for PR 18747 at commit c476e87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82182 has finished for PR 18747 at commit 2508593.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedBatch(

@kiszk
Copy link
Member Author

kiszk commented Sep 26, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82187 has finished for PR 18747 at commit 2508593.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedBatch(

@SparkQA
Copy link

SparkQA commented Oct 4, 2017

Test build #82451 has finished for PR 18747 at commit 0b04205.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedBatch(

@kiszk kiszk changed the title [WIP][SPARK-20822][SQL] Generate code to directly get value from ColumnVector [SPARK-20822][SQL] Generate code to directly get value from ColumnVector Oct 4, 2017
@SparkQA
Copy link

SparkQA commented Oct 4, 2017

Test build #82453 has finished for PR 18747 at commit 116ba6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedBatch(

@kiszk
Copy link
Member Author

kiszk commented Oct 8, 2017

@cloud-fan could you please review this in my PRs at first?

@kiszk kiszk changed the title [SPARK-20822][SQL] Generate code to directly get value from ColumnVector [SPARK-20822][SQL] Generate code to directly get value from ColumnVector for table cache Oct 10, 2017
@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82691 has finished for PR 18747 at commit 8134e73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CachedBatch(

@kiszk
Copy link
Member Author

kiszk commented Oct 12, 2017

@cloud-fan could you please review this since I resolved a confclit?

val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnVectorClzs = vectorTypes.getOrElse(
Seq.fill(colVars.size)(classOf[ColumnVector].getName))
val columnAccessorClz = "org.apache.spark.sql.execution.columnar.ColumnAccessor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these table cache specific code should go to InMemoryTableScanExec instead of here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, done

val buffers = relation.cachedColumnBuffers
// HACK ALERT: This is actually an RDD[CachedBatch].
// We're taking advantage of Scala's type erasure here to pass these batches along.
Seq(buffers.asInstanceOf[RDD[InternalRow]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarBatchScan assumes the input RDD is RDD[ColumnarBatch], you are breaking this assumption here.

I think we should convert CachedBatch to ColumnarBatch first, you can codegen a class to do it if necessary.

Copy link
Member Author

@kiszk kiszk Oct 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I break that assumption (RDD[CachedBatch]) since we have to create ColumnarBatch when it will be read.
Should we convert CachedBatch to ColumnarBatch here in inputRDDs()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile sure, done

columnarBatch.setNumRows(rowCount)

for (i <- 0 until attributes.length) {
val index = if (columnIndices.length == 0) i else columnIndices(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this if necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen cases columnIndices = Array[Int](0).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean attributes is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, while I ran several test suites in my local environment, I cannot find the case columnIndices = Array[Int](0). Let me commit without this if.

Copy link
Member Author

@kiszk kiszk Oct 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, all tests are succeeded without this if. Thank you for your comment.

columnarBatch.column(i).asInstanceOf[WritableColumnVector],
columnarBatchSchema.fields(i).dataType, rowCount)
}
return columnarBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove return

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82845 has finished for PR 18747 at commit c356ebe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Oct 18, 2017

ping @cloud-fan

case _ => true
}).isEmpty &&
!WholeStageCodegenExec.isTooManyFields(conf, relation.schema) &&
children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty
Copy link
Contributor

@cloud-fan cloud-fan Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is unnecessary, this is a LeafExecNode so it has no children.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure


private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the OnHeapColumnVector for the cached batches? It's a little inefficient to create one column vector for each cached batch.

Copy link
Member Author

@kiszk kiszk Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can improve efficiency if we can reuse the OnHeapColumnVector.

I think that it is not easy to reuse the OnHeapColumnVector between different cached batches.
IIUC there is no point to know a cached batch will not be referenced. We rely the management of the lifetime on GC by creating OnHeapColumnVector every time for each cached batch.
If we reuse the OnHeapColumnVector (i.e. keep a reference to OnHeapColumnVector), GC will not dispose OnHeapColumnVector even if the generated code will not use the OnHeapColumnVector. It means that uncompressed (huge) data would be alive for a long time. If we know the point where a cache batch will not be referenced, we could set null to data in OnHeapColumnVector.

Thus, I currently create OnHeapColumnVector for each cached batch. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use TaskContext.addTaskCompletionListener?

Copy link
Member Author

@kiszk kiszk Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. We could accomplish to set null into a data field (e.g. intData) in OnHeapColumnVector.intData by registering clear() method to TaskContext.addTaskCompletionListener.

In that case, I realized that we would have to reallocate a large array for a data field in OnHeapColumnVector.intData each time. Do we still need to take care of efficiency of allocating OnHeapColumnVector whose size is relatively smaller than the size of the large array?

If it still makes sense, I will try to implement clear() method and reallocating a large array that may introduce new API into OnHeapColumnVector.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok maybe leave it as a followup for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let us make a follow-up PR in the future.

@kiszk
Copy link
Member Author

kiszk commented Oct 19, 2017

Jenkins, retest this please

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 19, 2017

Test build #82912 has finished for PR 18747 at commit fd8cdbd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Oct 22, 2017

ping @cloud-fan

override val supportCodegen: Boolean = {
// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
val schema = StructType.fromAttributes(relation.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just relation.schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
if (supportCodegen) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If supportCodegen is false, I think we never call inputRDDs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. I will insert assertion


private def createAndDecompressColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we use OnHeapColumnVector instead of OffHeapColumnVector?

Copy link
Member Author

@kiszk kiszk Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both can be used. I follow this default configuration since it is not easy to get tungstenMemoryMode here.

@SparkQA
Copy link

SparkQA commented Oct 23, 2017

Test build #82993 has finished for PR 18747 at commit a12d8da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

WholeStageCodegenExec.isTooManyFields(conf, plan.schema)
val hasTooManyInputFields =
plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
plan.children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find(...).isDefined -> exists?

override val supportCodegen: Boolean = {
// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
relation.schema.fields.find(f => f.dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite find(...) -> forall?

@cloud-fan
Copy link
Contributor

LGTM except 2 minor comments. Can you benchmark some complex queries instead of full scan? I was expecting to see larger speed up via the columnar reader.

@SparkQA
Copy link

SparkQA commented Oct 24, 2017

Test build #83003 has finished for PR 18747 at commit db61b41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in c30d5cf Oct 24, 2017
@kiszk
Copy link
Member Author

kiszk commented Oct 24, 2017

Thank you for merging this. I will run another benchmark.

@kiszk
Copy link
Member Author

kiszk commented Oct 28, 2017

@cloud-fan I updated benchmark result (2.9x) in the description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants