Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jun 29, 2017

What changes were proposed in this pull request?

This PR adds a new class OnHeapCachedBatch class, which can have compressed data by using CompressibleColumnAccessor, derived from ColumnVector class.

As first step of this implementation, this JIRA supports primitive data and string types. Another PR will support array and other data types.

Current implementation adds compressed data by using putByteArray() method, and then gets data by using a getter (e.g. getInt()). Another PR will support setter (e.g. putInt()).

Current implementation uses UnsafeRow for a getter. It is slow implementation. Another PR will make it fast by eliminating UnsafeRow with specialized getters of ColumnAccessor for each data type.

How was this patch tested?

Added test suites

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #78925 has finished for PR 18468 at commit 00f70f5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class OnHeapCachedBatch extends ColumnVector implements java.io.Serializable

@SparkQA
Copy link

SparkQA commented Jun 30, 2017

Test build #78944 has finished for PR 18468 at commit 514400c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jun 30, 2017

@cloud-fan Could you review this?
As we discussed at Spark Summit, I prepared a new ColumnVector for compressed column using the current schemes. Any comments are appreciated.

@kiszk
Copy link
Member Author

kiszk commented Jul 3, 2017

cc: @hvanhovell

@kiszk
Copy link
Member Author

kiszk commented Jul 4, 2017

ping @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't think this can be a new memory mode...

Copy link
Member Author

@kiszk kiszk Jul 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation relies on memory mode to allocate a kind of ColumnVector.
If we do not add a new memory model, I think that we have to introduce additional conditional branches in getter/setter.

Is it better to add a new argument to specify a type (e.g. Compressible)?

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make ColumnVector.allocate accept a VectorType(not exists yet) instead of MemoryMode

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate this idea?
Does VectorType take a value NonCompress or Compressible for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks weird that we put the value to a row and then read that value from the row, can we return that value directly? e.g. columnAccessor.extractTo should be able to take a ColumnVector as input and set value to it.

Copy link
Member Author

@kiszk kiszk Jul 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. We can optimize these access by enhancing existing APIs.
Should we address these extensions in this PR? In my original plan, I will address such an optimization in another PR.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is building the infrastructure that not being used yet, so I think we don't need to rush.

Copy link
Member Author

@kiszk kiszk Jul 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make a set of pull request for ease of reviews.
However, I will add the optimization to directly return the value without UnsafeRow.

@SparkQA
Copy link

SparkQA commented Jul 7, 2017

Test build #79342 has finished for PR 18468 at commit 3f8e024.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2017

Test build #79359 has finished for PR 18468 at commit f657fa8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2017

Test build #79366 has finished for PR 18468 at commit 101e4b7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2017

Test build #79367 has finished for PR 18468 at commit 2c9e63e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 8, 2017

@cloud-fan Is this what you proposed for VectorType?

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79532 has finished for PR 18468 at commit be0a6a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 12, 2017

@cloud-fan could you please review this?

@kiszk
Copy link
Member Author

kiszk commented Jul 14, 2017

ping @cloud-fan

@kiszk
Copy link
Member Author

kiszk commented Jul 18, 2017

ping @ueshin @cloud-fan

@cloud-fan
Copy link
Contributor

I think this PR doesn't have a good abstraction of the problem. For table cache, our goal is not making the comressed data a ColumnVector, but having an efficient way to convert the compressed data(byte array) to ColumnVector. I think the most efficient way is to not do conversion at all, but having a wrapper, i.e. having a class CachedBatchColumnVector(data: Array[Byte]), which implements various getXXX methods by doing decompression. Then we don't need to introduce the VectorType concept and change ColumnVector.

@kiszk what do you think?

@kiszk
Copy link
Member Author

kiszk commented Jul 18, 2017

@cloud-fan Thank you for your comments. Based on this discussion, I introduced VectorType.
I have just seen @ueshin 's ArrowColumnVector implementation.
Otherwise, I will update CachedBatchColumnVector based on your comments and @ueshin 's implementation.

@kiszk kiszk changed the title [SPARK-20873][SQL] Enhance ColumnVector to support compressed representation [SPARK-20873][SQL] Creat CachedBatchColumnVector to abstract existing compressed column Jul 18, 2017
@cloud-fan
Copy link
Contributor

ArrowColumnVector is also a wrapper for arrow vector, and it doesn't introduce vector type stuff.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79703 has finished for PR 18468 at commit 0aa1b78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we support reading values in a random order? e.g. getBoolean(2), getBoolean(1), getBoolean(2)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support reading values in a random order. This is because implementation of CompressionScheme (e.g. IntDelta) supports only sequential access.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should throw exception for this case instead of returning wrong result.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will add code to track access order for each getter.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79712 has finished for PR 18468 at commit 23f7ea5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class CachedBatchColumnVector extends ColumnVector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explicitly say that, this is a wrapper to read compressed data as ColumnVector in table cache.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we inline this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can. done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should allow previousRowId == rowId, as we are able to support getInt(1), getInt(1), getInt(2)

Copy link
Member Author

@kiszk kiszk Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can do it for now. See line 78. It means that extractTo() must be called only once for a given rowId.
Now, we also support that isNullAt(0), getInt(0), too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to be so strict. The rule is, users can't jump back and read values, but other than that is OK, e.g. getInt(0), getInt(0), getInt(10), getInt(11).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I changed the limitation to "same or ascending".

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79739 has finished for PR 18468 at commit b83dedb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk kiszk changed the title [SPARK-20873][SQL] Creat CachedBatchColumnVector to abstract existing compressed column [SPARK-20873][SQL] Create CachedBatchColumnVector to abstract existing compressed column Jul 19, 2017
@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80314 has finished for PR 18468 at commit a26dc15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80619 has finished for PR 18468 at commit a26dc15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Aug 14, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80629 has finished for PR 18468 at commit a26dc15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Oct 3, 2017

This is followed by #18704

@kiszk kiszk closed this Oct 3, 2017
asfgit pushed a commit that referenced this pull request Oct 4, 2017
…d column (batch method)

## What changes were proposed in this pull request?

This PR abstracts data compressed by `CompressibleColumnAccessor` using `ColumnVector` in batch method. When `ColumnAccessor.decompress` is called, `ColumnVector` will have uncompressed data. This batch decompress does not use `InternalRow` to reduce the number of memory accesses.

As first step of this implementation, this JIRA supports primitive data types. Another PR will support array and other data types.

This implementation decompress data in batch into uncompressed column batch, as rxin suggested at [here](#18468 (comment)). Another implementation uses adapter approach [as cloud-fan suggested](#18468).

## How was this patch tested?

Added test suites

Author: Kazuaki Ishizaki <[email protected]>

Closes #18704 from kiszk/SPARK-20783a.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants