Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Oct 29, 2017

What changes were proposed in this pull request?

This PR generates the Java code to directly get a value for a primitive type array in ColumnVector without using an iterator for table cache (e.g. dataframe.cache). This PR improves runtime performance by eliminating data copy from column-oriented storage to InternalRow in a SpecificColumnarIterator iterator for primitive type. This is a follow-up PR of #18747.

The idea of this implementation is to add ColumnVector.UnsafeArray to keep UnsafeArrayData for an array in addition to ColumnVector.Array that keeps ColumnVector for a Java primitive array for an array.

Benchmark result: 21.4x

OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz

Filter for int primitive array with cache: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
InternalRow codegen                           1368 / 1887         23.0          43.5       1.0X

Filter for int primitive array with cache: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ColumnVector codegen                            64 /   90        488.1           2.0       1.0X

Benchmark program

  intArrayBenchmark(sqlContext, 1024 * 1024 * 20)
  def intArrayBenchmark(sqlContext: SQLContext, values: Int, iters: Int = 20): Unit = {
    import sqlContext.implicits._
    val benchmarkPT = new Benchmark("Filter for int primitive array with cache", values, iters)
    val df = sqlContext.sparkContext.parallelize(0 to ROWS, 1)
                       .map(i => Array.range(i, values)).toDF("a").cache
    df.count  // force to create df.cache
    val str = "ColumnVector"
    var c: Long = 0
    benchmarkPT.addCase(s"$str codegen") { iter =>
      c += df.filter(s"a[${values/2}] % 10 = 0").count
    }
    benchmarkPT.run()
    df.unpersist(true)
    System.gc()
  }

How was this patch tested?

Added test cases into ColumnVectorSuite, DataFrameTungstenSuite, and WholeStageCodegenSuite

@SparkQA
Copy link

SparkQA commented Oct 29, 2017

Test build #83179 has finished for PR 19601 at commit 80b9e31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static final class UnsafeArray extends ArrayData

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

@ueshin @cloud-fan could you please review this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not change the return type. ColumnVector will be public eventually, and ArrayData is not a public type.

Copy link
Member Author

@kiszk kiszk Oct 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
One question. ColumnVector.Array has some public fields such as length. I think that it would be good to use an accessor numElements or getLength. What do you think?

@cloud-fan
Copy link
Contributor

My feeling is that, we should change the cache format of array type to make it compatible with ColumnVector, then we don't need conversion from cached data to columnar batch.

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

Current ColumnVector uses primitive type array (e.g. int[] or double[]) based on data type of each column. On the other hand, cached data uses byte[] for all data type.
Do we change format (Array[Array[Byte]]) in CachedBatch for an primitive array?

@cloud-fan
Copy link
Contributor

So for primitive types, we encode and compress them to binary. When reading cached data, they are decoded to primitive array and can be put in OnHeadColumnVector directly.

For primitive type array, we treat it as binary. So when decoding it, we get a byte[] and need more effort to convert it to primitive type and put in OnHeadColumnVector.

Can we change how we encode array type like Arrow did?

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

There are two approaches to support a primitive array that is treated as binary. One is to add new ColumnVector.Array that I did. The other is to add new WritableColumnVector like @ueshin added ArrowColumnVector. Both are preferable to me.

I can add a new ColumnVector for primitive array (e.g. for UnsafeColumnVector) like Arrow did. Is it OK with you? To add the new class can avoid data conversion as Arrow did.

To use OffHeapColumnVector is good for getter that assumes binary data. However, its allocation assumes to use Platform API. We may need some extensions.

@cloud-fan
Copy link
Contributor

I'd like to also improve the write path. I think the current way to cache array type is not efficient, arrow-like format which put all elements(including nested array) together is better for encoding and compression.

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

I agree with you that we need to improve the write path. It will be addressed after improving the frequently-executed read path, as you suggested before. To improve the writh path will be addressed by the following PR. I think that there two parts: 1) change data format and 2) generate specialized code for each table cache.

For improving the read path, which approach is better? To add new ColumnVector.Array or to add new WritableColumnVector?

@cloud-fan
Copy link
Contributor

both ways work, just pick the simpler one. I'm concerned about how to access the nested array, you can try both approaches and see which one can solve the problem easier.

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

For now, this implementation has an limitation only to support non-nested array for ease of review.

I will try to support the nested array.

@kiszk
Copy link
Member Author

kiszk commented Oct 29, 2017

After I think about the choice for a while, I conclude that it is better to add the new WritableColumnVector (i.e. UnsafeColumnVector) and to keep the current ColumnVector.Array.
I think that to add a new class will give us some flexibility and good abstraction between public class ColumnVector and other internal classes.
I will try to implement this approach.

@kiszk
Copy link
Member Author

kiszk commented Oct 30, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83208 has finished for PR 19601 at commit c78d462.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class UnsafeColumnVector extends WritableColumnVector

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83210 has finished for PR 19601 at commit c78d462.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class UnsafeColumnVector extends WritableColumnVector

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83223 has started for PR 19601 at commit b971506.

@kiszk
Copy link
Member Author

kiszk commented Oct 31, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 31, 2017

Test build #83246 has finished for PR 19601 at commit b971506.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Oct 31, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 31, 2017

Test build #83250 has finished for PR 19601 at commit b971506.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 2, 2017

@cloud-fan could you please review this PR?
In my prototype, I succeeded to support a current nested array for table cache by changing only UnsafeColumnVector.java.

For ease of review, I would like to ask to review this PR for a simple case (non-nested primitive array) at first.
cc: @ueshin

@cloud-fan
Copy link
Contributor

can we hold it for a while? I'm thinking about ColumnVector refactoring and see how to deal with nested data uniformly.

@kiszk
Copy link
Member Author

kiszk commented Nov 2, 2017

My prototype for nested array can handle nested array by changing UnsafeArray.getArray and its callee methods, and does not require to change ColumnVector.
If the refactoring takes more than several weeks for Spark 2.4, I can commit my prototype to support nested array to be merged into Spark 2.3.
What do you think?

@SparkQA
Copy link

SparkQA commented Nov 5, 2017

Test build #83460 has finished for PR 19601 at commit 2270304.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 5, 2017

Test build #83463 has started for PR 19601 at commit 4666974.

@kiszk
Copy link
Member Author

kiszk commented Nov 5, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Nov 5, 2017

Test build #83465 has finished for PR 19601 at commit 4666974.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 6, 2017

@cloud-fan could you please review this again since this version avoids to override ColumnVector.getArray as you suggested? I also confirmed that the nested array can be supported without overriding ColumnVector.getArray.
cc @ueshin

@SparkQA
Copy link

SparkQA commented Nov 10, 2017

Test build #83689 has finished for PR 19601 at commit eac3d30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 16, 2017

There are some parts that relies on the format of UnsafeArrayData. I mean that bit-by-bit copy of UnsafeArrayData is performed. Can we handle this copy using the new format for an unsafe array?

@cloud-fan
Copy link
Contributor

We'd need to change the UnsafeArrayData format too, to avoid data copying when building the cache. BTW I think it's ok to release this columnar cache reader without efficient complex type support, so we don't need to rush.

@kiszk
Copy link
Member Author

kiszk commented Nov 16, 2017

I see. Let us revisit this design later.

I would appreciate it if you would review this columnar cache reader with simple primitive-type (non-nested) array.

@SparkQA
Copy link

SparkQA commented Nov 21, 2017

Test build #84080 has finished for PR 19601 at commit 63d9d57.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2017

Test build #84082 has finished for PR 19601 at commit 9b6b890.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 22, 2017

@cloud-fan could you please review this?

@kiszk kiszk closed this Nov 27, 2017
@kiszk kiszk reopened this Nov 27, 2017
@kiszk
Copy link
Member Author

kiszk commented Nov 27, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Nov 27, 2017

Test build #84215 has finished for PR 19601 at commit 9b6b890.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 28, 2017

Test build #84253 has finished for PR 19601 at commit 20d2ba2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Oct 3, 2018

Hi, @kiszk . Is this still valid for 3.0.0?

@dongjoon-hyun
Copy link
Member

Hi, @kiszk . Can we close this for now? You can make another PR later if you want.

@kiszk
Copy link
Member Author

kiszk commented Oct 27, 2018

Sure, let me close this

@kiszk kiszk closed this Oct 27, 2018
@dongjoon-hyun
Copy link
Member

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants