Skip to content

Conversation

@xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

The implementation for the RocksDB instance, which is used in the RocksDB state store. It plays a role as a handler for the RocksDB instance and RocksDBFileManager.

Why are the changes needed?

Part of the RocksDB state store implementation.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT added.

@SparkQA
Copy link

SparkQA commented Jun 16, 2021

Test build #139867 has finished for PR 32928 at commit bbb534e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RocksDB(
  • class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
  • case class RocksDBConf(
  • case class AcquiredThreadInfo(

@SparkQA
Copy link

SparkQA commented Jun 16, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44393/

sql/core/pom.xml Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for taking care. Following the comments from Yikun in http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Add-RocksDB-StateStore-tt30645.html, I planned to upgrade the rocksdb version after all the code shipped. At least >=6.4.6 for the Arm64 support.
Personally, I think it's good to keep the 6.2.2 since this is well tested in the production environment and it's safe to use it in the first stage. I believe the latest RocksDB versions are stable enough, but the state store implementation has its own design on the snapshot/checkpoint file architecture, it's good to spend more time on testing whether we can safely upgrade it to the latest RocksDB version. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to have well tested version first.

@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44553/

@SparkQA
Copy link

SparkQA commented Jun 19, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44553/

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @xuanyuanking . We can choose the best way for the Apache Spark community.

Just a side note: all RocksDB versions including 6.20.3 seems to be blocker for Apple Silicon Mac environment.

In the community, there is another Spark JIRA issue considering RocksDB as a replacement for LevelDB.

cc @dbtsai , @viirya , @sunchao , @cloud-fan , @gengliangwang

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix Scala 2.13 build, @xuanyuanking .

@xuanyuanking
Copy link
Member Author

@dongjoon-hyun Thanks for the information!

Please fix Scala 2.13 build, @xuanyuanking .

Sure

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44614/

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44614/

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Test build #140087 has finished for PR 32928 at commit 80154b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RocksDB(
  • class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
  • case class RocksDBConf(
  • case class AcquiredThreadInfo(

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 22, 2021

I'd rather say the version bump would be a no-go till we figure out either it is backward compatible (with RocksDB 6.2.2 as it has been used for production), or quite easy way to migrate the old one to the new one. This applies to the changes on LevelDB to RocksDB as well (SPARK-35782).

Given the importance, I'd like to see the efforts in different JIRA and be orthogonal efforts with RocksDB state store provider.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this basically the same as #32767, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, commit 80154b2 should be the only change we need to review here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We only need to review the latest commit for all the WIP pr related to the RocksDB state store.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB -> RocksDBLoader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 606a38b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql.util? why not in org.apache.spark.sql.execution.streaming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if we can place to the package RocksDB state store provider has/will be placed, which I guess either org.apache.spark.sql.execution.streaming or org.apache.spark.sql.execution.streaming.state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may find the loader only related to the library loading work and not streaming only, so that's why I put it into sql.util.
Ether way is OK to me, WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB is only used for RocksDB state store provider, hence better to put it in same place. We may want to move this to other place like core if we replace LevelDB with RocksDB, but moving this doesn't break anything so no need to consider in advance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, moved to streaming module.

Comment on lines +58 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these options be configured in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The options from line 59-62 should not configurable since it's mainly for binding the sync mode of RocksDB. For the tableFormatConfig, I think they can be configured in the future if user have more performance requirements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manager -> fileManager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in f928820

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If old value is not null, couldn't it be from committed db? How do we know it must be uncommitted one (and skipping increasing numUncommittedKeys)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the functionality of getFromBatchAndDB, it will get the key in both write batch and committed db.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can ensure that new key appears regardless of where it comes; so looks accurate to add 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, if the value could be from committed db, how can we always decrease numUncommittedKeys?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also works as well regardless of where it comes. If it's removed from write batch, it should either from committed RocksDB (key is going to be removed) or uncommitted write batch (numUncommittedKeys should be increased in prior).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider the num[Unc/C]ommittedKeys as a kind of status here, the number here means the absolute value from the beginning(version 0) to now. So that's also why there's no place we set the numUncommittedKeys to 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need s"".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in f928820

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in $checkpointDir -> "to DFS"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in f928820

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openDB() will close DB. This looks redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're modifying workingDir so looks safer to close DB first before modifying the dir.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I delete the closeDB in openDB in f928820

@SparkQA
Copy link

SparkQA commented Jun 25, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44854/

@SparkQA
Copy link

SparkQA commented Jun 25, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44854/

@SparkQA
Copy link

SparkQA commented Jun 25, 2021

Test build #140322 has finished for PR 32928 at commit 606a38b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking xuanyuanking changed the title [WIP][SPARK-35784] Implementation for RocksDB instance [SPARK-35784] Implementation for RocksDB instance Jun 28, 2021
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if we can place to the package RocksDB state store provider has/will be placed, which I guess either org.apache.spark.sql.execution.streaming or org.apache.spark.sql.execution.streaming.state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checkpointing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done in next commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering; where it is being used? this is only defined as val hence the question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly used in the UT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're modifying workingDir so looks safer to close DB first before modifying the dir.

acquireLock.notifyAll()
}

private def getDBProperty(property: String): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's used in the metrics-related PR. I delete it in the next commit.

def withDB[T](
remoteDir: String,
version: Int = 0,
conf: RocksDBConf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the conf param is always provided - is there usage on using default value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No usage for the default value, delete the default value in the next commit.

}

case class AcquiredThreadInfo(
threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any caller providing these parameters manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Make threadRef and tc as public member in the next commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can ensure that new key appears regardless of where it comes; so looks accurate to add 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also works as well regardless of where it comes. If it's removed from write batch, it should either from committed RocksDB (key is going to be removed) or uncommitted write batch (numUncommittedKeys should be increased in prior).

@volatile private var db: NativeRocksDB = _
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numCommittedKeys = 0L
@volatile private var numUncommittedKeys = 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is actually confusing. This made me think we are tracking committed keys vs uncommitted keys (not including committed keys), but the reality is, we are tracking the number of keys in committed RocksDB, and the number of keys in (committed + uncommitted) which will be committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we couldn't find a better name, we need to add code comment at least to explain this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, add more comment in the next commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe be numKeysInCurrentVersion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rethink about it, I can get what it means. So it is not actually straightforward.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. It's might hard to explain all the meaning in the name. So I prefer to have a long comment here 😂

@xuanyuanking
Copy link
Member Author

All comments addressed done. Ready for another round of review.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I'll wait @viirya for a couple of days to finalize his review. I'll merge this in this Thursday if there's no further feedback.

} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
throw t
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we re-throw this, loadedVersion is used for some purpose in the upper layer? I'm wondering if this is recoverable error and loadedVersion is a valuable information to be used later in that case. Never mind.

"writeBatch" -> writeTimeMs,
"flush" -> flushTimeMs,
"compact" -> compactTimeMs,
"pauseBg" -> pauseTimeMs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, shall we use a full name instead of Bg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"pauseBackground" might be a little bit longer, how about change it to a simple "pause"? Actually this is only for the internal temp map key, we'll have a detailed metrics name in the later PR.

  val CUSTOM_METRIC_PAUSE_TIME = StateStoreCustomTimingMetric(
    "rocksdbCommitPauseLatency", "RocksDB: commit - pause background time")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to pause in 641a91e

silentDeleteRecursively(localRootDir, "stopping")
} catch {
case e: Exception =>
logWarning("Error closing RocksDB", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do logging only at all Exception, Utils.tryLogNonFatalError might be better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice. But since NonFatal doesn't include InterruptedException which is also an Exception, maybe we'd better not change to use Utils.tryLogNonFatalError here.

@HeartSaVioR HeartSaVioR changed the title [SPARK-35784] Implementation for RocksDB instance [SPARK-35784][SS] Implementation for RocksDB instance Jun 29, 2021
@volatile private var numCommittedKeys = 0L
// number of keys which will be committed in the next version
@volatile private var numUncommittedKeys = 0L
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Could you add @GuardedBy("acquireLock")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in the next commit.

iter.seekToFirst()

// Attempt to close this iterator if there is a task failure, or a task interruption.
// This is a hack because it assumes that the RocksDB inside running in a task.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, that the RocksDB inside running -> that the RocksDB is running?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed in the next commit.

@viirya
Copy link
Member

viirya commented Jun 29, 2021

Thanks @HeartSaVioR. I will take another look.

Comment on lines 84 to 85
// number of keys in all committed versions before
@volatile private var numCommittedKeys = 0L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it is for "all committed versions"? In load, we just overwrite it numCommittedKeys = metadata.numKeys. So looks like it is only for the loaded version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used in the commit operation, which will be introduced in the next commit: https://github.com/apache/spark/pull/32933/files#diff-4663b8349511dc040c3d5204bb6868064002d0b45aadaf1f1795a683897a5c7dR224

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here "all" seems to make confusion. The number only represents for "one" version, hence the confusion.

Now I think it'd be nice if we can rename the field name to represent these fields are representing the number of keys per certain version. I guess below candidates could help avoiding confusion:

  • numKeysOnLoadedVersion - numKeysOnWritingVersion
  • numKeysOnCommittedVersion - numKeysOnUncommittedCommittingVersion
  • or better names

even without code comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the numKeysOnLoadedVersion - numKeysOnWritingVersion. Reviewed the new name in all the places (includes the further PRs) and it has a clear meaning. Thanks for the advice!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me.

flushOptions.close()
dbOptions.close()
dbLogger.close()
silentDeleteRecursively(localRootDir, "stopping")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopping -> closing RocksDB?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in the next commit.

case InfoLogLevel.DEBUG_LEVEL => logDebug(_)
case _ => logTrace(_)
}
loggingFunc(s"[Native-${infoLogLevel.getValue}] $logMsg")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native -> NativeRocksDB?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in the next commit.

if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL
dbOptions.setLogger(dbLogger)
dbOptions.setInfoLogLevel(dbLogLevel)
logInfo(s"Set DB native logging level to $dbLogLevel")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be more specified. DB -> RocksDB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done in the next commit.

Comment on lines +402 to +407
// Configuration that specifies whether to compact the RocksDB data every time data is committed
private val COMPACT_ON_COMMIT_CONF = ConfEntry("compactOnCommit", "false")
private val PAUSE_BG_WORK_FOR_COMMIT_CONF = ConfEntry("pauseBackgroundWorkForCommit", "true")
private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we should document these configs at the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I planned to document all the config for RocksDB in the user guide. If we have the configurable requirement in the future, people can easily be referenced.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments.

@SparkQA
Copy link

SparkQA commented Jun 29, 2021

Test build #140374 has finished for PR 32928 at commit 21e1728.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. @dongjoon-hyun do you have more comments? If no, I will merge this.

@dongjoon-hyun
Copy link
Member

No other comments. Thank you for asking, @viirya .

@viirya
Copy link
Member

viirya commented Jun 30, 2021

Thank you @dongjoon-hyun.

@viirya
Copy link
Member

viirya commented Jun 30, 2021

Thanks @xuanyuanking for this work! Merging to master.

@viirya viirya closed this in 3257a30 Jun 30, 2021
@xuanyuanking
Copy link
Member Author

Thanks all for the review! I'll rebase the other PRs soon.

@xuanyuanking xuanyuanking deleted the SPARK-35784 branch June 30, 2021 02:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants