Skip to content
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package io.iohk.ethereum.db.dataSource

import java.util.concurrent.locks.ReentrantReadWriteLock

import io.iohk.ethereum.utils.Logger
import cats.effect.Resource
import io.iohk.ethereum.db.dataSource.DataSource._
import io.iohk.ethereum.db.dataSource.RocksDbDataSource.{IterationError, IterationFinished}
import io.iohk.ethereum.db.dataSource.RocksDbDataSource._
import io.iohk.ethereum.utils.TryWithResources.withResources
import monix.eval.Task
import monix.reactive.Observable
import org.rocksdb._
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.util.control.NonFatal
Expand All @@ -22,9 +21,8 @@ class RocksDbDataSource(
private var cfOptions: ColumnFamilyOptions,
private var nameSpaces: Seq[Namespace],
private var handles: Map[Namespace, ColumnFamilyHandle]
) extends DataSource {

private val logger = LoggerFactory.getLogger("rocks-db")
) extends DataSource
with Logger {

@volatile
private var isClosed = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about changing it to https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicReference.html
?

I have good experience with atomic refs. I remember using @volatile a very long time ago in Java and it was no reliable 🤔

Copy link
Contributor

@kapke kapke Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/AtomicBoolean.html :)

TBH I think that volatile is enough here since updates of this var don't involve a read (and that's the basic problem classes in java.util.concurrent.atomic solve - making reads and writes that depend on each other atomically)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are now writing to this variable only after write lock is in locked state, this could probably be even a simple var without @volatile annotations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need this variable as @volatile.
In case only one thread reads and writes the value of a volatile variable and other threads only read the variable, then the reading threads are guaranteed to see the latest value written to the volatile variable.
I agree with @kapke, that volatile is enough here since updates of this var don't involve a read. Are we ok with this? Or do you think in another approach?

Copy link
Contributor

@KonradStaniec KonradStaniec Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we only modify this variable from the inside of write lock and read only from inside read lock, which has strictly stronger guarantees than volatile variable. So the variable could be simple var and we would still have more synchronization than with volatile variable.

But to be honest we can leave it volatile, it is not big deal here. (I am against atomics here as having cas operations inside locks, would really be an overkill)

Expand All @@ -37,16 +35,20 @@ class RocksDbDataSource(
* @return the value associated with the passed key.
*/
override def get(namespace: Namespace, key: Key): Option[Value] = {
assureNotClosed()
RocksDbDataSource.dbLock.readLock().lock()
dbLock.readLock().lock()
try {
assureNotClosed()
Option(db.get(handles(namespace), readOptions, key.toArray))
} catch {
case NonFatal(e) =>
logger.error(s"Not found associated value to a namespace: $namespace and a key: $key, cause: {}", e.getMessage)
throw new RuntimeException(e)
case error: RocksDbDataSourceClosedException =>
throw error
case NonFatal(error) =>
throw RocksDbDataSourceException(
s"Not found associated value to a namespace: $namespace and a key: $key",
error
)
} finally {
RocksDbDataSource.dbLock.readLock().unlock()
dbLock.readLock().unlock()
}
}

Expand All @@ -59,23 +61,24 @@ class RocksDbDataSource(
* @return the value associated with the passed key.
*/
override def getOptimized(namespace: Namespace, key: Array[Byte]): Option[Array[Byte]] = {
assureNotClosed()
RocksDbDataSource.dbLock.readLock().lock()
dbLock.readLock().lock()
try {
assureNotClosed()
Option(db.get(handles(namespace), readOptions, key))
} catch {
case NonFatal(e) =>
logger.error(s"Not found associated value to a key: $key, cause: {}", e.getMessage)
throw new RuntimeException(e)
case error: RocksDbDataSourceClosedException =>
throw error
case NonFatal(error) =>
throw RocksDbDataSourceException(s"Not found associated value to a key: $key", error)
} finally {
RocksDbDataSource.dbLock.readLock().unlock()
dbLock.readLock().unlock()
}
}

override def update(dataSourceUpdates: Seq[DataUpdate]): Unit = {
assureNotClosed()
RocksDbDataSource.dbLock.readLock().lock()
dbLock.writeLock().lock()
try {
assureNotClosed()
withResources(new WriteOptions()) { writeOptions =>
withResources(new WriteBatch()) { batch =>
dataSourceUpdates.foreach {
Expand All @@ -95,14 +98,12 @@ class RocksDbDataSource(
}
}
} catch {
case NonFatal(e) =>
logger.error(
s"DataSource not updated, cause: {}",
e.getMessage
)
throw new RuntimeException(e)
case error: RocksDbDataSourceClosedException =>
throw error
case NonFatal(error) =>
throw RocksDbDataSourceException(s"DataSource not updated", error)
} finally {
RocksDbDataSource.dbLock.readLock().unlock()
dbLock.writeLock().unlock()
}
}

Expand Down Expand Up @@ -139,12 +140,13 @@ class RocksDbDataSource(
}

/**
* This function is used only for tests.
* This function updates the DataSource by deleting all the (key-value) pairs in it.
*/
override def clear(): Unit = {
destroy()
logger.debug(s"About to create new DataSource for path: ${rocksDbConfig.path}")
val (newDb, handles, readOptions, dbOptions, cfOptions) = RocksDbDataSource.createDB(rocksDbConfig, nameSpaces.tail)
log.debug(s"About to create new DataSource for path: ${rocksDbConfig.path}")
val (newDb, handles, readOptions, dbOptions, cfOptions) = createDB(rocksDbConfig, nameSpaces.tail)

assert(nameSpaces.size == handles.size)

Expand All @@ -160,11 +162,11 @@ class RocksDbDataSource(
* This function closes the DataSource, without deleting the files used by it.
*/
override def close(): Unit = {
logger.debug(s"About to close DataSource in path: ${rocksDbConfig.path}")
assureNotClosed()
isClosed = true
RocksDbDataSource.dbLock.writeLock().lock()
log.info(s"About to close DataSource in path: ${rocksDbConfig.path}")
dbLock.writeLock().lock()
try {
assureNotClosed()
isClosed = true
// There is specific order for closing rocksdb with column families descibed in
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
// 1. Free all column families handles
Expand All @@ -175,16 +177,19 @@ class RocksDbDataSource(
dbOptions.close()
// 3. Free column families options
cfOptions.close()
log.info(s"DataSource closed successfully in the path: ${rocksDbConfig.path}")
} catch {
case NonFatal(e) =>
logger.error("Not closed the DataSource properly, cause: {}", e)
throw new RuntimeException(e)
case error: RocksDbDataSourceClosedException =>
throw error
case NonFatal(error) =>
throw RocksDbDataSourceException(s"Not closed the DataSource properly", error)
} finally {
RocksDbDataSource.dbLock.writeLock().unlock()
dbLock.writeLock().unlock()
}
}

/**
* This function is used only for tests.
* This function closes the DataSource, if it is not yet closed, and deletes all the files used by it.
*/
override def destroy(): Unit = {
Expand All @@ -193,8 +198,13 @@ class RocksDbDataSource(
close()
}
} finally {
import rocksDbConfig._
destroyDB()
}
}

protected def destroyDB(): Unit = {
try {
import rocksDbConfig._
val tableCfg = new BlockBasedTableConfig()
.setBlockSize(blockSize)
.setBlockCache(new ClockCache(blockCacheSize))
Expand All @@ -212,15 +222,18 @@ class RocksDbDataSource(
.setIncreaseParallelism(maxThreads)
.setTableFormatConfig(tableCfg)

logger.debug(s"About to destroy DataSource in path: $path")
log.debug(s"About to destroy DataSource in path: $path")
RocksDB.destroyDB(path, options)
options.close()
} catch {
case NonFatal(error) =>
throw RocksDbDataSourceException(s"Not destroyed the DataSource properly", error)
}
}

private def assureNotClosed(): Unit = {
if (isClosed) {
throw new IllegalStateException(s"This ${getClass.getSimpleName} has been closed")
throw RocksDbDataSourceClosedException(s"This ${getClass.getSimpleName} has been closed")
}
}

Expand All @@ -242,6 +255,9 @@ object RocksDbDataSource {
case object IterationFinished extends RuntimeException
case class IterationError(ex: Throwable)

case class RocksDbDataSourceClosedException(message: String) extends IllegalStateException(message)
case class RocksDbDataSourceException(message: String, cause: Throwable) extends RuntimeException(message, cause)

/**
* The rocksdb implementation acquires a lock from the operating system to prevent misuse
*/
Expand Down Expand Up @@ -296,6 +312,9 @@ object RocksDbDataSource {
options,
cfOpts
)
} catch {
case NonFatal(error) =>
throw RocksDbDataSourceException(s"Not created the DataSource properly", error)
} finally {
RocksDbDataSource.dbLock.writeLock().unlock()
}
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.iohk.ethereum.network.discovery.DiscoveryListener
import io.iohk.ethereum.network.{PeerManagerActor, ServerActor}
import io.iohk.ethereum.testmode.{TestLedgerBuilder, TestmodeConsensusBuilder}
import io.iohk.ethereum.utils.Config

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -88,13 +88,20 @@ abstract class BaseNode extends Node {
}

tryAndLogFailure(() => consensus.stopProtocol())
tryAndLogFailure(() => Await.ready(system.terminate, shutdownTimeoutDuration))
tryAndLogFailure(() => storagesInstance.dataSource.close())
tryAndLogFailure(() =>
Await.ready(
system.terminate.map(
_ ->
log.info("actor system finished")
),
shutdownTimeoutDuration
)
)
if (jsonRpcConfig.ipcServerConfig.enabled) {
tryAndLogFailure(() => jsonRpcIpcServer.close())
}
tryAndLogFailure(() => Metrics.get().close())

tryAndLogFailure(() => storagesInstance.dataSource.close())
}
}

Expand Down