Skip to content

Conversation

@jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Jun 26, 2024

What changes were proposed in this pull request?

Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the RocksDBKeyEncoder layer as we can pre-allocate extra 2 bytes and avoid additional memcpy.

  • Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this.
  • Remove useColumnFamilies from StateStoreChangelogV2 API.

Why are the changes needed?

Currently within the scope of the arbitrary stateful API v2 (transformWithState) project, each state variable is stored inside one physical column family within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles compaction independently on those independent SST files.

When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads.

Does this PR introduce any user-facing change?

No. If useColumnFamilies are set to true in the StateStore.init(), virtual column family will be used.

How was this patch tested?

Unit tests in RocksDBStateStoreSuite, and integration tests in TransformWithStateSuite.
Moved test suites in RocksDBSuite into RocksDBStateStoreSuite because some previous verification functions are now moved into RocksDBStateProvider

Was this patch authored or co-authored using generative AI tooling?

No.

@jingz-db jingz-db changed the title [SS] Virtual Column Family for RocksDB [SPARK-48742][SS] Virtual Column Family for RocksDB Jun 27, 2024
@jingz-db jingz-db marked this pull request as ready for review June 27, 2024 21:46
val prefix = new Array[Byte](
prefixKeyEncoded.length + 4 + offSetForColFamilyPrefix)
if (hasVirtualColFamilyPrefix) {
Platform.putLong(prefix, Platform.BYTE_ARRAY_OFFSET, colFamilyId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short might be enough ? 2 bytes probably good enough for 0 indexed num of column families ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Short instead of Long as VCF id.

}

// Maintain mapping of column family name to handle
// Maintain a set of column family name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also mention where/why this is needed/used ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline - we can prob remove this set entirely and move relevant stuff to the provider layer

@jingz-db jingz-db requested a review from anishshri-db July 3, 2024 21:12
}
}

// TODO SPARK-48796 after restart state id will not be the same
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the situation in integration test in TransformWithStateSuite. We cannot re-load the same col family id for the same col family.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be fixed once state schema related changes are merged. Create a task for fixing this: https://issues.apache.org/jira/browse/SPARK-48796

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea thanks

@jingz-db jingz-db requested a review from anishshri-db July 4, 2024 00:45
@jingz-db jingz-db requested a review from anishshri-db July 4, 2024 01:03
Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only left sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

def encodePrefixKey(prefixKey: UnsafeRow, vcfId: Option[Short]): Array[Byte]
def encodeKey(row: UnsafeRow, vcfId: Option[Short]): Array[Byte]
def decodeKey(keyBytes: Array[Byte]): UnsafeRow
def offSetForColFamilyPrefix: Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: offset, not offSet

Btw, I'd propose two things;

  1. encode(Prefix)Key and decodeKey are no longer symmetric. encode(Prefix)Key is dealing with vcfId but in decodeKey there is no way for us to get the vcfId back.

I see it could be less performant (though I feel like it doesn't matter much) if we have to always read the part of vcfId and return even for the case we know the vcfId already. But probably better to have a new method which explicitly calls out in the name it will skip vcfId and assume the caller already knows vcfId.

  1. It looks like column family prefix is applied over the all encoder implementations, which I think we can do better abstraction. This may be a good time to have base (abstract) implementation of RocksDBKeyStateEncoder handling column family prefix.

Something like following:

abstract class RocksDBKeyStateEncoderBase(useColumnFamilies: Boolean)
  extends RocksDBKeyStateEncoder {

  protected def encodeColumnFamilyPrefix(
      numBytes: Int,
      vcfId: Option[Short],
      useColumnFamilies: Boolean): (Array[Byte], Int) = {
    if (useColumnFamilies) {
      val encodedBytes = new Array[Byte](numBytes + VIRTUAL_COL_FAMILY_PREFIX_BYTES)
      Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, vcfId.get)
      (encodedBytes, Platform.BYTE_ARRAY_OFFSET + VIRTUAL_COL_FAMILY_PREFIX_BYTES)
    } else {
      val encodedBytes = new Array[Byte](numBytes)
      (encodedBytes, Platform.BYTE_ARRAY_OFFSET)
    }
  }

  protected def decodeColumnFamilyPrefix(keyBytes: Array[Byte]): (Option[Short], Int) = {
    if (useColumnFamilies) {
      val vcfId = Platform.getShort(keyBytes, Platform.BYTE_ARRAY_OFFSET)
      (Some(vcfId), Platform.BYTE_ARRAY_OFFSET + VIRTUAL_COL_FAMILY_PREFIX_BYTES)
    } else {
      (None, Platform.BYTE_ARRAY_OFFSET)
    }
  }

  // Only if we want to skip over reading CF prefix...
  protected def decodeKeyStartOffset: Int = {
    if (useColumnFamilies) {
      Platform.BYTE_ARRAY_OFFSET + VIRTUAL_COL_FAMILY_PREFIX_BYTES
    } else Platform.BYTE_ARRAY_OFFSET
}

(The method name can change with preferred way, not talented with naming.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I'll review the change in Encoder file once the proposal is reflected or we decide not to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the proposal can be combined with this proposal:

#47107 (comment)

Copy link
Contributor Author

@jingz-db jingz-db Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor into RocksDBKeyStateEncoderBase. Add a virtual column family Id as input parameter and the API for RocksDBKeyStateEncoder.encodeKey/decodeKey remains unchanged as we now only pass in the vcfId during encoder initialization.

(RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]

private val colFamilyNameToIdMap = new java.util.concurrent.ConcurrentHashMap[String, Short]
// TODO SPARK-48796 load column family id from state schema when restarting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like we can't release before addressing SPARK-48796, do I understand correctly? If then I'd need to mark the ticket as blocker for Spark 4.0.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do we have a path forward on storing column family id? Will SPARK-48796 address this altogether?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct - this will be handled in that ticket where we store the vcf id within the new state schema format


def getVcfIdBytes(id: Short): Array[Byte] = {
val encodedBytes = new Array[Byte](VIRTUAL_COL_FAMILY_PREFIX_BYTES)
Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just consider that as API spec of Platform. get/put requires starting offset and from reading byte array they require Platform.BYTE_ARRAY_OFFSET to represent the starting offset.

* key, and the operator should not call prefixScan method in StateStore.
* @param useColumnFamilies Whether the underlying state store uses a single or multiple column
* families
* families; by default we'll use virtual column family if this parameter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we shouldn't mention this - it's an implementation detail and even specific to RocksDB implementation. We define "interface" here and it's up to provider implementation how to deal with column families.

}
}

/* Column family related tests */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the tests added here copied from RocksDBSuite? If it is, it'd be a great help for reviewer if you go through and comment whenever there is a difference worth looking at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added few comments below to denote which suites are removed entirely and which are moved into RocksDBStateStoreSuite.

@anishshri-db
Copy link
Contributor

Also @jingz-db - test failure seems related ?

[error] Failed tests:
[error] 	org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreIntegrationSuite
[error] (sql / Test / test) sbt.TestsFailedException: Tests unsuccessful

}
}

testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests in RocksDBSuite are removed and moved into RocksDBStateStoreSuite because the verification functions are moved from RocksDB to RocksDBstateStoreProvider.

}
}

testWithChangelogCheckpointingEnabled(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suite is removed because column family is no longer part of the input parameter in changelog v2.

}

/* Column family related tests */
testWithColumnFamilies("column family creation with invalid names",
Copy link
Contributor Author

@jingz-db jingz-db Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following suites are moved from RocksDBSuite. The suites are mostly the same as the old. The only difference is we are testing on the provider layer instead of rocksDB instance layer.

}
}

Seq(
Copy link
Contributor Author

@jingz-db jingz-db Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following suites are newly added suites.
We don't have any new suite added for testing with RangeScanEncoder because there are lots of existing suites in the RocksDBStateStoreSuite already, while we don't have any existingNoPrefix and PrefixScan related suites with non-default column families.

@jingz-db jingz-db requested a review from HeartSaVioR July 5, 2024 23:47
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minors, thanks for the patience!

prefixKeyEncoded, Platform.BYTE_ARRAY_OFFSET, prefixKeyEncodedLen)

// Here we calculate the remainingKeyEncodedLen leveraging the length of keyBytes
val remainingKeyEncodedLen = keyBytes.length - 4 - prefixKeyEncodedLen -
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be also calculated based on decodeKeyStartOffset.

decodeKeyStartOffset + 4 + prefixKeyEncodedLen = starting offset for remaining key as encoded

The reason we abstract the start offset for both encode and decode is to let the subclasses to not deal with column family prefix directly.

Copy link
Contributor Author

@jingz-db jingz-db Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a close look :)
IIUC, The prefixKeyEncoded is part of the original implementation of the PrefixKeyScanStateEncoder.decode ( the prefixKeyEncoded is the length of the prefix of the key itself, not virtual column family prefix). So the virtual column family prefix is already dealt with in the decodeKeyStartOffset.

Though for decodeKey I found it hard to rid subclass of dealing with the col family prefix completely - e.g. for remainingKeyEncodedLen here, we still need to substract the length of the column family prefix.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you assuming the case of Platform.BYTE_ARRAY_OFFSET != 0 (in some random platform) hence the length of byte array is misaligned if we take Platform.BYTE_ARRAY_OFFSET into account? Awesome thought if intended :) Great details.

@jingz-db jingz-db requested a review from HeartSaVioR July 8, 2024 16:23
@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

ericm-db pushed a commit to ericm-db/spark that referenced this pull request Jul 10, 2024
### What changes were proposed in this pull request?

Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the `RocksDBKeyEncoder` layer as we can pre-allocate extra 2 bytes and avoid additional memcpy.

- Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this.
- Remove `useColumnFamilies` from `StateStoreChangelogV2` API.

### Why are the changes needed?

Currently within  the scope of the arbitrary stateful API v2 (transformWithState)  project, each state variable is stored inside one [physical column family](https://github.com/facebook/rocksdb/wiki/Column-Families) within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles  compaction independently on those independent SST files.

When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads.

### Does this PR introduce _any_ user-facing change?

No. If `useColumnFamilies` are set to true in the `StateStore.init()`, virtual column family will be used.

### How was this patch tested?

Unit tests in `RocksDBStateStoreSuite`, and integration tests in `TransformWithStateSuite`.
Moved test suites in `RocksDBSuite` into `RocksDBStateStoreSuite` because some previous verification functions are now moved into `RocksDBStateProvider`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47107 from jingz-db/virtual-col-family.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
jingz-db added a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
### What changes were proposed in this pull request?

Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the `RocksDBKeyEncoder` layer as we can pre-allocate extra 2 bytes and avoid additional memcpy.

- Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this.
- Remove `useColumnFamilies` from `StateStoreChangelogV2` API.

### Why are the changes needed?

Currently within  the scope of the arbitrary stateful API v2 (transformWithState)  project, each state variable is stored inside one [physical column family](https://github.com/facebook/rocksdb/wiki/Column-Families) within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles  compaction independently on those independent SST files.

When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads.

### Does this PR introduce _any_ user-facing change?

No. If `useColumnFamilies` are set to true in the `StateStore.init()`, virtual column family will be used.

### How was this patch tested?

Unit tests in `RocksDBStateStoreSuite`, and integration tests in `TransformWithStateSuite`.
Moved test suites in `RocksDBSuite` into `RocksDBStateStoreSuite` because some previous verification functions are now moved into `RocksDBStateProvider`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47107 from jingz-db/virtual-col-family.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?

Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the `RocksDBKeyEncoder` layer as we can pre-allocate extra 2 bytes and avoid additional memcpy.

- Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this.
- Remove `useColumnFamilies` from `StateStoreChangelogV2` API.

### Why are the changes needed?

Currently within  the scope of the arbitrary stateful API v2 (transformWithState)  project, each state variable is stored inside one [physical column family](https://github.com/facebook/rocksdb/wiki/Column-Families) within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles  compaction independently on those independent SST files.

When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads.

### Does this PR introduce _any_ user-facing change?

No. If `useColumnFamilies` are set to true in the `StateStore.init()`, virtual column family will be used.

### How was this patch tested?

Unit tests in `RocksDBStateStoreSuite`, and integration tests in `TransformWithStateSuite`.
Moved test suites in `RocksDBSuite` into `RocksDBStateStoreSuite` because some previous verification functions are now moved into `RocksDBStateProvider`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47107 from jingz-db/virtual-col-family.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR pushed a commit that referenced this pull request Feb 21, 2025
…g to use the correct number of version bytes

### What changes were proposed in this pull request?

There are currently two bugs:
- The NoPrefixKeyStateEncoder adds an extra version byte to each row when UnsafeRow encoding is used: #47107
- Rows written with Avro encoding do not include a version byte: #48401

**Neither of these bugs have been released, since these bugs are only triggered with multiple column families, and transformWithState is only using it, which is going to be released for Spark 4.0.0.**

This change fixes both of these bugs.

### Why are the changes needed?

These changes are needed in order to conform with the expected state row encoding format.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49996 from ericm-db/SPARK-51249.

Lead-authored-by: Eric Marnadi <[email protected]>
Co-authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR pushed a commit that referenced this pull request Feb 21, 2025
…g to use the correct number of version bytes

### What changes were proposed in this pull request?

There are currently two bugs:
- The NoPrefixKeyStateEncoder adds an extra version byte to each row when UnsafeRow encoding is used: #47107
- Rows written with Avro encoding do not include a version byte: #48401

**Neither of these bugs have been released, since these bugs are only triggered with multiple column families, and transformWithState is only using it, which is going to be released for Spark 4.0.0.**

This change fixes both of these bugs.

### Why are the changes needed?

These changes are needed in order to conform with the expected state row encoding format.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49996 from ericm-db/SPARK-51249.

Lead-authored-by: Eric Marnadi <[email protected]>
Co-authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 42ab97a)
Signed-off-by: Jungtaek Lim <[email protected]>
Pajaraja pushed a commit to Pajaraja/spark that referenced this pull request Mar 6, 2025
…g to use the correct number of version bytes

### What changes were proposed in this pull request?

There are currently two bugs:
- The NoPrefixKeyStateEncoder adds an extra version byte to each row when UnsafeRow encoding is used: apache#47107
- Rows written with Avro encoding do not include a version byte: apache#48401

**Neither of these bugs have been released, since these bugs are only triggered with multiple column families, and transformWithState is only using it, which is going to be released for Spark 4.0.0.**

This change fixes both of these bugs.

### Why are the changes needed?

These changes are needed in order to conform with the expected state row encoding format.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#49996 from ericm-db/SPARK-51249.

Lead-authored-by: Eric Marnadi <[email protected]>
Co-authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants