Skip to content

Commit 641a91e

Browse files
committed
address comments
1 parent aba142d commit 641a91e

File tree

1 file changed

+8
-5
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state

1 file changed

+8
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.File
2121
import java.util.Locale
22+
import javax.annotation.concurrent.GuardedBy
2223

2324
import scala.collection.{mutable, Map}
2425
import scala.ref.WeakReference
@@ -85,6 +86,8 @@ class RocksDB(
8586
@volatile private var numCommittedKeys = 0L
8687
// number of keys which will be committed in the next version
8788
@volatile private var numUncommittedKeys = 0L
89+
90+
@GuardedBy("acquireLock")
8891
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
8992

9093
/**
@@ -158,7 +161,7 @@ class RocksDB(
158161
iter.seekToFirst()
159162

160163
// Attempt to close this iterator if there is a task failure, or a task interruption.
161-
// This is a hack because it assumes that the RocksDB inside running in a task.
164+
// This is a hack because it assumes that the RocksDB is running inside a task.
162165
Option(TaskContext.get()).foreach { tc =>
163166
tc.addTaskCompletionListener[Unit] { _ => iter.close() }
164167
}
@@ -226,7 +229,7 @@ class RocksDB(
226229
"writeBatch" -> writeTimeMs,
227230
"flush" -> flushTimeMs,
228231
"compact" -> compactTimeMs,
229-
"pauseBg" -> pauseTimeMs,
232+
"pause" -> pauseTimeMs,
230233
"checkpoint" -> checkpointTimeMs,
231234
"fileSync" -> fileSyncTimeMs
232235
)
@@ -265,7 +268,7 @@ class RocksDB(
265268
flushOptions.close()
266269
dbOptions.close()
267270
dbLogger.close()
268-
silentDeleteRecursively(localRootDir, "stopping")
271+
silentDeleteRecursively(localRootDir, "closing RocksDB")
269272
} catch {
270273
case e: Exception =>
271274
logWarning("Error closing RocksDB", e)
@@ -332,7 +335,7 @@ class RocksDB(
332335
case InfoLogLevel.DEBUG_LEVEL => logDebug(_)
333336
case _ => logTrace(_)
334337
}
335-
loggingFunc(s"[Native-${infoLogLevel.getValue}] $logMsg")
338+
loggingFunc(s"[NativeRocksDB-${infoLogLevel.getValue}] $logMsg")
336339
}
337340
}
338341

@@ -342,7 +345,7 @@ class RocksDB(
342345
if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL
343346
dbOptions.setLogger(dbLogger)
344347
dbOptions.setInfoLogLevel(dbLogLevel)
345-
logInfo(s"Set DB native logging level to $dbLogLevel")
348+
logInfo(s"Set RocksDB native logging level to $dbLogLevel")
346349
dbLogger
347350
}
348351

0 commit comments

Comments
 (0)