Skip to content

Commit 72b6fe5

Browse files
committed
address comment
1 parent f928820 commit 72b6fe5

File tree

3 files changed

+11
-14
lines changed

3 files changed

+11
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
3030
import org.apache.spark.TaskContext
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
33-
import org.apache.spark.sql.util.RocksDBLoader
3433
import org.apache.spark.util.{NextIterator, Utils}
3534

3635
/**
@@ -42,13 +41,13 @@ import org.apache.spark.util.{NextIterator, Utils}
4241
* @see [[RocksDBFileManager]] to see how the files are laid out in local disk and DFS.
4342
* @param dfsRootDir Remote directory where checkpoints are going to be written
4443
* @param conf Configuration for RocksDB
45-
* @param localRootDir Root directory in local disk that is used to working and checkpoing dirs
44+
* @param localRootDir Root directory in local disk that is used to working and checkpointing dirs
4645
* @param hadoopConf Hadoop configuration for talking to the remote file system
4746
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
4847
*/
4948
class RocksDB(
5049
dfsRootDir: String,
51-
val conf: RocksDBConf = RocksDBConf(),
50+
val conf: RocksDBConf,
5251
localRootDir: File = Utils.createTempDir(),
5352
hadoopConf: Configuration = new Configuration,
5453
loggingId: String = "") extends Logging {
@@ -82,7 +81,9 @@ class RocksDB(
8281

8382
@volatile private var db: NativeRocksDB = _
8483
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
84+
// number of keys in all committed versions before
8585
@volatile private var numCommittedKeys = 0L
86+
// number of keys which will be committed in the next version
8687
@volatile private var numUncommittedKeys = 0L
8788
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
8889

@@ -305,10 +306,6 @@ class RocksDB(
305306
acquireLock.notifyAll()
306307
}
307308

308-
private def getDBProperty(property: String): Long = {
309-
db.getProperty(property).toLong
310-
}
311-
312309
private def openDB(): Unit = {
313310
assert(db == null)
314311
db = NativeRocksDB.open(dbOptions, workingDir.toString)
@@ -437,9 +434,10 @@ object RocksDBConf {
437434
def apply(): RocksDBConf = apply(new StateStoreConf())
438435
}
439436

440-
case class AcquiredThreadInfo(
441-
threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread()),
442-
tc: TaskContext = TaskContext.get()) {
437+
case class AcquiredThreadInfo() {
438+
val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread())
439+
val tc: TaskContext = TaskContext.get()
440+
443441
override def toString(): String = {
444442
val taskStr = if (tc != null) {
445443
val taskDetails =

sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.util
18+
package org.apache.spark.sql.execution.streaming.state
1919

20-
import org.rocksdb._
20+
import org.rocksdb.RocksDB
2121

2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.util.UninterruptibleThread
@@ -58,4 +58,3 @@ object RocksDBLoader extends Logging {
5858
}
5959
}
6060
}
61-

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class RocksDBSuite extends SparkFunSuite {
3838
val remoteDir = Utils.createTempDir().toString
3939
new File(remoteDir).delete() // to make sure that the directory gets created
4040

41-
val conf = RocksDBConf().copy(compactOnCommit = true)
41+
val conf = RocksDBConf().copy(compactOnCommit = compactOnCommit)
4242
withDB(remoteDir, conf = conf) { db =>
4343
assert(db.get("a") === null)
4444
assert(iterator(db).isEmpty)

0 commit comments

Comments
 (0)