Skip to content

Conversation

@revans2
Copy link
Contributor

@revans2 revans2 commented Jul 10, 2020

What changes were proposed in this pull request?

Add a config to let users change how SQL/Dataframe data is compressed when cached.

This adds a few new classes/APIs for use with this config.

  1. CachedBatch is a trait used to tag data that is intended to be cached. It has a few APIs that lets us keep the compression/serialization of the data separate from the metrics about it.
  2. CachedBatchSerializer provides the APIs that must be implemented to cache data.
    • convertInternalRowToCachedBatch is an API that takes an RDD[InternalRow] and turns its result into an RDD[CachedBatch]. The actual caching is done outside of this API
    • convertColumnarBatchToCachedBatch does the same thing, but takes an RDD[ColumnarBatch] instead.
    • supportsColumnarInput is used to check if an RDD[ColumnarBatch] can be used as input or not. If it returns false convertInternalRowToCachedBatch will be called. If it returns true either API might be called depending on the physical plan being cached.
    • buildFilter is an API that takes a set of predicates and builds a filter function that can be used to filter the RDD[CachedBatch] before it is read. This can avoid the cost of decompressing unneeded data in some cases.
    • supportsColumnarOutput determines if convertCachedBatchToColumnarBatch or convertCachedBatchToInternalRow should be called/
    • convertCachedBatchToColumnarBatch converts an RDD[CachedBatch] into an RDD[ColumnarBatch].
    • convertCachedBatchToInternalRow converts an RDD[CachedBatch] into an RDD[InternalRow].

There is also an API that lets you reuse the current filtering based on min/max values. SimpleMetricsCachedBatch and SimpleMetricsCachedBatchSerializer.

Why are the changes needed?

This lets users explore different types of compression and compression ratios.

Does this PR introduce any user-facing change?

This adds in a single config, and exposes some developer API classes described above.

How was this patch tested?

I ran the unit tests around this and I also did some manual performance tests. I could find any performance difference between the old and new code, and if there is any it is within error.

@tgravescs
Copy link
Contributor

@revans2 can you add back in the PR template (### What changes were proposed in this pull request?, Why are the changes needed?, etc) and fill in the sections. that gets put into the commit history and is nice to have

@revans2
Copy link
Contributor Author

revans2 commented Jul 10, 2020

I would love to see this go into the 3.1 release. It also applies cleanly to the 3.0 branch if someone wants to pull it in there too.

This may look big, but it is mostly refactoring to get all of the code associated with transforming a batch around caching into a single location. I did clean up a few things along the way to separate out the actual transformation from the metrics.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125633 has finished for PR 29067 at commit 4bf995f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"The name of a class that implements \" +
  • trait CachedBatch
  • trait CachedBatchSerializer extends Serializable
  • trait SimpleMetricsCachedBatch extends CachedBatch
  • trait SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging
  • case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
  • class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer
  • case class CachedRDDBuilder(

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only made it part of the way thru so far, mostly some nits here.

buffer
}
relation.cacheBuilder.serializer.decompressColumnar(buffers, relation.output, attributes, conf)
.map { cb =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, 2 space indention

*/
private[columnar]
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
trait CachedBatchSerializer extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should mark this with @DeveloperAPI
We should also add in the @SInCE("3.1.0")
should add to all the public interfaces you added

@transient cachedPlan: SparkPlan,
tableName: Option[String]) {
/**
* A [[CachedBatch]] that stored some simple metrics that can be used for filtering of batches with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit s/stored/stores/

* @param selectedAttributes the field that should be loaded from the data, and the order they
* should appear in the output batch.
* @param conf the configuration for the job.
* @return the batches in the ColumnarBatch format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return RDD...

* @param conf the configuration for the job.
* @return the batches in the ColumnarBatch format.
*/
def decompressColumnar(input: RDD[CachedBatch],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put input on the next line

* @param conf the configuration for the job.
* @return the rows that were stored in the cached batches.
*/
def decompressToRows(input: RDD[CachedBatch],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put input on next line

}
}

override def decompressColumnar(input: RDD[CachedBatch],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

input param on next line

cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch] = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra newline

trait SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging {
override def buildFilter(predicates: Seq[Expression],
cachedAttributes: Seq[Attribute]):
(Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit indentation off

}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra newline

@revans2
Copy link
Contributor Author

revans2 commented Jul 10, 2020

@tgravescs please take another look and let me know what you think

@maropu maropu changed the title [SPARK-32274] Make SQL cache serialization pluggable [SPARK-32274][SQL] Make SQL cache serialization pluggable Jul 10, 2020
@SparkQA
Copy link

SparkQA commented Jul 11, 2020

Test build #125642 has finished for PR 29067 at commit 0361237.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@revans2, would you mind outlining which APIs do you propose in the PR description? Seems like it's not only configurations but classes. It would be great if it describes how to use with a concrete use case to provide a better context.

@revans2
Copy link
Contributor Author

revans2 commented Jul 11, 2020

@HyukjinKwon sure, I am really proposing one API and one helper that could let someone reuse some of the existing code.

  1. CachedBatch is a trait used to tag data that is intended to be cached. It has a few APIs that lets us keep the compression/serialization of the data separate from the metrics about it.
  2. CachedBatchSerializer provides the APIs that must be implemented to cache data.
    • convertForCache is an API that runs a cached spark plan and turns its result into an RDD[CachedBatch]. The actual caching is done outside of this API
    • buildFilter is an API that takes a set of predicates and builds a filter function that can be used to filter the RDD[CachedBatch] returned by convertForCache
    • decompressColumnar decompresses an RDD[CachedBatch] into an RDD[ColumnarBatch] This is only used for a limited set of data types. These data types may expand in the future. If they do we can add in a new API with a default value that says which data types this serializer supports.
    • decompressToRows decompresses an RDD[CachedBatch] into an RDD[InternalRow] this API, like decompressColumnar decompresses the data in CachedBatch but turns it into InternalRows, typically using code generation for performance reasons.

There is also an API that lets you reuse the current filtering based on min/max values. SimpleMetricsCachedBatch and SimpleMetricsCachedBatchSerializer.

These are set up so that a user can explore other types of compression or indexing and their impact on performance. One could look at adding a bloom filter in addition to the min/max values currently supported for filtering. One could look at adding in compression for some of the data types not currently supported, like arrays or structs if that is something that is an important use case for someone.

The use case we have right now is in connection with https://github.com/NVIDIA/spark-rapids where we would like to provide compression that is better suited for both the CPU and the GPU.

The config right now is a static conf allowing one and only one cache serializer per session. That can change in the future but I didn't want to add in the extra code to track the serializer along with the RDD until there was a use case for it, and theoretically, if something is a clear win it should just be incorporated into the default serializer instead.

Seq(In(attribute, Nil)), testRelation)
assert(tableScanExec.partitionFilters.isEmpty)
val testSerializer = new TestCachedBatchSerializer(false, 1)
testSerializer.buildFilter(Seq(In(attribute, Nil)), Seq(attribute))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is assert removed from this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the assert because the refactoring moved that state into a location that is not easily accessible. From reading the test and the comments around it the purpose of the test is to verify that the particular case did not result in an exception. The purpose of the assertion was to verify that the internal state was what they expected. I felt that the assert was unnecessary, but if you want me to add it back in I can refactor the code further and do so.

@maropu
Copy link
Member

maropu commented Jul 12, 2020

Hi, @revans2 , thanks for the work. I have two questions now;

The use case we have right now is in connection with https://github.com/NVIDIA/spark-rapids where we would like to provide compression that is better suited for both the CPU and the GPU.

  1. You cannot use the data source V2 interface for your purpose? (Or, you cannot extend the interface for that?) That's because I think the interface is intended for interconnection between Spark and other systems (or files). The current cache structure is tightly coupled to the Spark internal one, so I'm not sure that we can directly expose it to 3rd-party developers.
  2. If SPARK_CACHE_SERIALIZER specified, does the current approach replace all the existing caching operations with custom ones? Users cannot select which cache structure (default or custom) is used on runtime?

Btw, could you move your design comment above into the PR description? Since it will appear in a commit log, it is important to write the detail of a PR proposal for better traceability.

@revans2
Copy link
Contributor Author

revans2 commented Jul 13, 2020

@maropu

You cannot use the data source V2 interface for your purpose?

We want to produce a transparent replacement for .cache, .persist and the SQL CACHE operator using GPUs for acceleration. Caching data right now is slow on the CPU. .cache is a common enough operation that we would like to support it in our plugin and feel that it is something that we can really accelerate. In theory, we could reuse the datasource v2 API, but it would require a lot more refactoring to make it fit into the cache operator. Possibly refactoring of the data source V2 API as well. If that is what you think we need to do I can work on it, but it will be a much bigger change.

The current cache structure is tightly coupled to the Spark internal one, so I'm not sure that we can directly expose it to 3rd-party developers.

Yes, many of the APIs used by this code are internal to Spark and are subject to change at any moment. That is no different than with many other plugin APIs, like the ones we use to enable our GPU accelerated dataframe plugin. Or the callback APIs currently used for metrics. If you would like me to do more to document that these APIs, and the APIs that they depend on, are unstable and can change I am happy to do it.

If SPARK_CACHE_SERIALIZER specified, does the current approach replace all the existing caching operations with custom ones?

SPARK_CACHE_SERIALIZER is a static conf so if you specify it, it will be used for all cache operations within that session. But cache is not shared between sessions so I felt that was reasonable because there was no regression in functionality.

Users cannot select which cache structure (default or custom) is used on runtime?

A user cannot switch between modes within the same session, but each spark session can have a separate setting. The goal of this was to prevent users from changing the setting after caching data. In theory, the code as it is written should be able to handle changing the setting at any point in time, except for the code to load the plugin as a singleton. That use case was not one that I currently am concerned about so I didn't want to add in tests for it nor commit to try and support it. If you really want this use case to be supported I can make the needed changes and test/document it.

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125836 has finished for PR 29067 at commit 641f28f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@revans can you take a look at the failed test, one is caching test:

[error] Failed tests:
62375
[error] org.apache.spark.sql.CachedTableSuite
62376
[error] org.apache.spark.sql.execution.columnar.PartitionBatchPruningSuite

@revans2
Copy link
Contributor Author

revans2 commented Jul 14, 2020

Thanks @tgravescs those were issues related to metrics. One that I was over counting, and another that I ended up under counting. They should be fixed now.

@maropu
Copy link
Member

maropu commented Jul 15, 2020

We want to produce a transparent replacement for .cache, .persist and the SQL CACHE operator using GPUs for acceleration. Caching data right now is slow on the CPU. .cache is a common enough operation that we would like to support it in our plugin and feel that it is something that we can really accelerate. In theory, we could reuse the datasource v2 API, but it would require a lot more refactoring to make it fit into the cache operator. Possibly refactoring of the data source V2 API as well. If that is what you think we need to do I can work on it, but it will be a much bigger change.

Thanks for the explanation, @revans2. hm, I think its worth documenting the proposal as SPIP. Then, we need to discuss it in the dev mailing list first (I'm currently not sure that other developers want this interface) WDYT? @HyukjinKwon @cloud-fan @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125860 has finished for PR 29067 at commit ad11a79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Personally I was thinking we didn't need an entire SPIP since its a localized developer api change which I see as more of a small improvement. If others think it's useful to have a SPIP then we should create one?

Note at my previous job, I had wanted to experiment with changing the caching format as I think caching currently is very expensive.

@HyukjinKwon
Copy link
Member

I just saw the comment. Thanks for summarizing @revans2.

@HyukjinKwon
Copy link
Member

@maropu, per the documentation Spark Project Improvement Proposals (SPIP), if you feel like it needs an SPIP, it does. I trust your judgement.

I will read it more closely today and provide more feedback. cc @kiszk, @srowen, @viirya, @ueshin too.

@cloud-fan
Copy link
Contributor

Yea IMO this is not a big feature that requires SPIP.

At a high-level, this idea makes sense to me. With the columnar engine extension, the current table cache format may not be the most efficient. I'll review it closely this week.

* @param buffers The buffers for serialized columns
* @param stats The stat of columns
*/
private[sql]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this since execution package is meant to be private as of SPARK-16964

*/
@DeveloperApi
@Since("3.1.0")
trait SimpleMetricsCachedBatch extends CachedBatch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we be able to avoid trait? It was actually recommended to avoid as an API when possible, https://github.com/databricks/scala-style-guide#traits-and-abstract-classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultCachedBatch is a case class, and I didn't think it was acceptable to have a case class inherent from a regular class. If I am wrong on that just let me know and I'll make it an abstract class.

*/
@DeveloperApi
@Since("3.1.0")
trait CachedBatch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid to putting the APIs under execution package? it wouldn't be documented and this module is private, execution/package.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure what package would you like me to put them under? org.apache.spark.sql? org.apache.spark.sql.columnar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to move some of the classes, but I ran into issues with filtering. PartitionStatistics and ColumnStatisticsSchema are private to sql.execution.columnar. I had to open them up slightly to be able to do the refactoring.

Comment on lines +244 to +245
val cb = if (cachedPlan.supportsColumnar) {
serializer.convertColumnarBatchToCachedBatch(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reaching here, this serializer always supports columnar input? i.e., supportsColumnarInput returns true for the serializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and No. If you go through the front door and create an InMemoryRelation using InMemoryRelation.apply, but not the one made for testing, then it guarantees that the only time you get a SparkPlan that supportsColumnar is when the serializer also supportsColumnarInput. You could purposely construct an InMemoryReplaction that does not match this, but you would have to go out of your way to do it. I can add in an assertion to an InMemoryRelation constructor to make sure that it is correct if you want me to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think normally we won't create a InMemoryRelation in that way, so sounds good.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126819 has finished for PR 29067 at commit ea762e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

#29067 (comment), @revans2, there was a bit of unexpected test failures in GitHub Actions, and they are fixed now. The GitHub Actions builds should pass once you sync with the latest master after rebasing or merging the upstream.

@HyukjinKwon
Copy link
Member

Looks getting closer to go to me too.

@revans2
Copy link
Contributor Author

revans2 commented Jul 31, 2020

The test failure looks unrelated

[info] KafkaSourceStressSuite:
[info] - stress test with multiple topics and partitions *** FAILED *** (1 minute, 1 second)
[info]   Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.

@SparkQA
Copy link

SparkQA commented Jul 31, 2020

Test build #126897 has finished for PR 29067 at commit 3f2f527.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ColumnStatisticsSchema(a: Attribute) extends Serializable
  • class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2020

Test build #126902 has finished for PR 29067 at commit 3f2f527.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ColumnStatisticsSchema(a: Attribute) extends Serializable
  • class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 2, 2020

Test build #126933 has finished for PR 29067 at commit 3f2f527.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ColumnStatisticsSchema(a: Attribute) extends Serializable
  • class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable

@cloud-fan
Copy link
Contributor

github action passes, I'm merging to master, thanks!

@cloud-fan cloud-fan closed this in 713124d Aug 3, 2020
@dongjoon-hyun
Copy link
Member

Hi, @revans2 and @cloud-fan .

Unfortunately, this PR causes a flaky UT failure situation. Like the following, this causes failures depending on the test sequence.

$ build/sbt "sql/testOnly *.CachedBatchSerializerSuite *.CachedTableSuite"
...
[info] *** 30 TESTS FAILED ***
[error] Failed: Total 51, Failed 30, Errors 0, Passed 21
[error] Failed tests:
[error] 	org.apache.spark.sql.CachedTableSuite
[error] (sql/test:testOnly) sbt.TestsFailedException: Tests unsuccessful

I made a PR to fix that. Currently, this causes random Jenkins failures on the other PRs.

@dongjoon-hyun
Copy link
Member

@gatorsmile
Copy link
Member

@revans2 Could you update the PR description? The PR description does not match the actual code

@revans2
Copy link
Contributor Author

revans2 commented Sep 8, 2020

@gatorsmile what name were you thinking of? The patch did "make the SQL cache serialization pluggable" so I am not 100% sure what needs to be corrected. I am happy to do it, just need to know what to change it to.

@HyukjinKwon
Copy link
Member

I think he just simply meant the naming of APIs such as decompressColumnar in the PR description are not matched to the actual APIs added (e.g., after #29067 (review)).

@revans2
Copy link
Contributor Author

revans2 commented Sep 8, 2020

But the config is still around sterilization, and despite the name of the APIs compression is optional. If you want me to add compression to the name of the pr I can.

@cloud-fan
Copy link
Contributor

The CachedBatchSerializer has methods like convertColumnarBatchToCachedBatch, but the PR description is stale amd still says decompressColumnar.

@dongjoon-hyun
Copy link
Member

Hi, @gatorsmile. Is your request just about revising this GitHub PR description, right?
We cannot change commit logs.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 8, 2020

Just for a bit of more contexts, the commits are regularly digested and shared into the dev mailing list after being summarized. The PRs introducing APIs are usually directly shared into the mailing list with PR links, and likely people visit PRs. Or people take a look for PR descriptions when they track the history for example.

It's best to match the change and PR description. It's okay to forgot to match. It happens.
Let's just match the PR description to what the PR proposes since we found it now.

@revans2 revans2 deleted the pluggable_cache_serializer branch September 8, 2020 12:34
@revans2
Copy link
Contributor Author

revans2 commented Sep 8, 2020

Oh I understand now. Sorry I got the title and the description out of sync. Yes I will update it.

@revans2
Copy link
Contributor Author

revans2 commented Sep 8, 2020

Please take a look at the description and let me know if I missed anything?

@cloud-fan
Copy link
Contributor

@revans2 thanks for updating!

@HyukjinKwon
Copy link
Member

Thank you @revans2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.