@@ -6,7 +6,6 @@ import akka.util.ByteString
66import io .iohk .ethereum .db .dataSource .DataSourceBatchUpdate
77import io .iohk .ethereum .db .dataSource .RocksDbDataSource .IterationError
88import io .iohk .ethereum .db .storage .NodeStorage .{NodeEncoded , NodeHash }
9- import io .iohk .ethereum .db .storage .StateStorage .RollBackFlush
109import io .iohk .ethereum .db .storage .TransactionMappingStorage .TransactionLocation
1110import io .iohk .ethereum .db .storage ._
1211import io .iohk .ethereum .db .storage .pruning .PruningMode
@@ -223,8 +222,14 @@ class BlockchainImpl(
223222
224223 // There is always only one writer thread (ensured by actor), but can by many readers (api calls)
225224 // to ensure visibility of writes, needs to be volatile or atomic ref
226- private val bestKnownBlockAndLatestCheckpoint : AtomicReference [BestBlockLatestCheckpointNumbers ] =
227- new AtomicReference (BestBlockLatestCheckpointNumbers (BigInt (0 ), BigInt (0 )))
225+ // Laziness required for mocking BlockchainImpl on tests
226+ private lazy val bestKnownBlockAndLatestCheckpoint : AtomicReference [BestBlockLatestCheckpointNumbers ] =
227+ new AtomicReference (
228+ BestBlockLatestCheckpointNumbers (
229+ appStateStorage.getBestBlockNumber(),
230+ appStateStorage.getLatestCheckpointBlockNumber()
231+ )
232+ )
228233
229234 override def getBlockHeaderByHash (hash : ByteString ): Option [BlockHeader ] =
230235 blockHeadersStorage.get(hash)
@@ -246,22 +251,15 @@ class BlockchainImpl(
246251 bestSavedBlockNumber,
247252 bestKnownBlockNumber
248253 )
249- if (bestKnownBlockNumber > bestSavedBlockNumber)
250- bestKnownBlockNumber
251- else
252- bestSavedBlockNumber
253- }
254254
255- override def getLatestCheckpointBlockNumber (): BigInt = {
256- val latestCheckpointNumberInStorage = appStateStorage.getLatestCheckpointBlockNumber()
257- // The latest checkpoint number is firstly saved in memory and then persisted to the storage only when it's time to persist cache.
258- // The latest checkpoint number in memory can be bigger than the number in storage because the cache wasn't persisted yet
259- if (bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber > latestCheckpointNumberInStorage)
260- bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
261- else
262- latestCheckpointNumberInStorage
255+ // The cached best block number should always be more up-to-date than the one on disk, we are keeping access to disk
256+ // above only for logging purposes
257+ bestKnownBlockNumber
263258 }
264259
260+ override def getLatestCheckpointBlockNumber (): BigInt =
261+ bestKnownBlockAndLatestCheckpoint.get().latestCheckpointNumber
262+
265263 override def getBestBlock (): Block = {
266264 val bestBlockNumber = getBestBlockNumber()
267265 log.debug(" Trying to get best block with number {}" , bestBlockNumber)
@@ -294,7 +292,7 @@ class BlockchainImpl(
294292 val currentBestBlockNumber = getBestBlockNumber()
295293 val currentBestCheckpointNumber = getLatestCheckpointBlockNumber()
296294 log.debug(
297- " Persisting block info data into database. Persisted block number is {}. " +
295+ " Persisting app info data into database. Persisted block number is {}. " +
298296 " Persisted checkpoint number is {}" ,
299297 currentBestBlockNumber,
300298 currentBestCheckpointNumber
@@ -313,10 +311,6 @@ class BlockchainImpl(
313311 .and(storeChainWeight(block.header.hash, weight))
314312 .commit()
315313
316- // not transactional part
317- // the best blocks data will be persisted only when the cache will be persisted
318- stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
319-
320314 if (saveAsBestBlock && block.hasCheckpoint) {
321315 log.debug(
322316 " New best known block block number - {}, new best checkpoint number - {}" ,
@@ -331,6 +325,10 @@ class BlockchainImpl(
331325 )
332326 saveBestKnownBlock(block.header.number)
333327 }
328+
329+ // not transactional part
330+ // the best blocks data will be persisted only when the cache will be persisted
331+ stateStorage.onBlockSave(block.header.number, appStateStorage.getBestBlockNumber())(persistBestBlocksData)
334332 }
335333
336334 override def storeBlockHeader (blockHeader : BlockHeader ): DataSourceBatchUpdate = {
@@ -388,87 +386,83 @@ class BlockchainImpl(
388386 blockNumberMappingStorage.remove(number)
389387 }
390388
391- // scalastyle:off method.length
392389 override def removeBlock (blockHash : ByteString , withState : Boolean ): Unit = {
393- val maybeBlockHeader = getBlockHeaderByHash (blockHash)
390+ val maybeBlock = getBlockByHash (blockHash)
394391
395- log.debug(
396- " Trying to remove block with hash {} and number {}" ,
397- ByteStringUtils .hash2string(blockHash),
398- maybeBlockHeader.map(_.number)
399- )
400-
401- val maybeTxList = getBlockBodyByHash(blockHash).map(_.transactionList)
402- val bestBlocks = bestKnownBlockAndLatestCheckpoint.get()
403- // as we are decreasing block numbers in memory more often than in storage,
404- // we can't use here getBestBlockNumber / getLatestCheckpointBlockNumber
405- val bestBlockNumber =
406- if (bestBlocks.bestBlockNumber != 0 ) bestBlocks.bestBlockNumber else appStateStorage.getBestBlockNumber()
407- val latestCheckpointNumber = {
408- if (bestBlocks.latestCheckpointNumber != 0 ) bestBlocks.latestCheckpointNumber
409- else appStateStorage.getLatestCheckpointBlockNumber()
392+ maybeBlock match {
393+ case Some (block) => removeBlock(block, withState)
394+ case None =>
395+ log.warn(s " Attempted removing block with hash ${ByteStringUtils .hash2string(blockHash)} that we don't have " )
410396 }
397+ }
411398
412- val blockNumberMappingUpdates = {
413- maybeBlockHeader.fold(blockNumberMappingStorage.emptyBatchUpdate)(h =>
414- if (getHashByBlockNumber(h.number).contains(blockHash))
415- removeBlockNumberMapping(h.number)
416- else blockNumberMappingStorage.emptyBatchUpdate
417- )
418- }
399+ // scalastyle:off method.length
400+ private def removeBlock (block : Block , withState : Boolean ): Unit = {
401+ val blockHash = block.hash
419402
420- val (checkpointUpdates, prevCheckpointNumber): (DataSourceBatchUpdate , Option [BigInt ]) = maybeBlockHeader match {
421- case Some (header) =>
422- if (header.hasCheckpoint && header.number == latestCheckpointNumber) {
423- val prev = findPreviousCheckpointBlockNumber(header.number, header.number)
424- prev
425- .map { num =>
426- (appStateStorage.putLatestCheckpointBlockNumber(num), Some (num))
427- }
428- .getOrElse {
429- (appStateStorage.removeLatestCheckpointBlockNumber(), Some (0 ))
430- }
431- } else (appStateStorage.emptyBatchUpdate, None )
432- case None =>
433- (appStateStorage.emptyBatchUpdate, None )
434- }
403+ log.debug(s " Trying to remove block block ${block.idTag}" )
435404
436- val newBestBlockNumber : BigInt = if (bestBlockNumber >= 1 ) bestBlockNumber - 1 else 0
405+ val txList = block.body.transactionList
406+ val bestBlockNumber = getBestBlockNumber()
407+ val latestCheckpointNumber = getLatestCheckpointBlockNumber()
408+
409+ val blockNumberMappingUpdates =
410+ if (getHashByBlockNumber(block.number).contains(blockHash))
411+ removeBlockNumberMapping(block.number)
412+ else blockNumberMappingStorage.emptyBatchUpdate
413+
414+ val newBestBlockNumber : BigInt = (bestBlockNumber - 1 ).max(0 )
415+ val newLatestCheckpointNumber : BigInt =
416+ if (block.hasCheckpoint && block.number == latestCheckpointNumber) {
417+ findPreviousCheckpointBlockNumber(block.number, block.number)
418+ } else latestCheckpointNumber
419+
420+ /*
421+ This two below updates are an exception to the rule of only updating the best blocks when persisting the node
422+ cache.
423+ They are required in case we are removing a block that's marked on db as the best (or as the last checkpoint),
424+ to keep it's consistency, as it will no longer be the best block (nor the last checkpoint).
425+
426+ This updates can't be done if the conditions are false as we might not have the associated mpt nodes, so falling
427+ into the case of having an incomplete best block and so an inconsistent db
428+ */
429+ val bestBlockNumberUpdates =
430+ if (appStateStorage.getBestBlockNumber() > newBestBlockNumber)
431+ appStateStorage.putBestBlockNumber(newBestBlockNumber)
432+ else appStateStorage.emptyBatchUpdate
433+ val latestCheckpointNumberUpdates =
434+ if (appStateStorage.getLatestCheckpointBlockNumber() > newLatestCheckpointNumber)
435+ appStateStorage.putLatestCheckpointBlockNumber(newLatestCheckpointNumber)
436+ else appStateStorage.emptyBatchUpdate
437+
438+ log.debug(
439+ " Persisting app info data into database. Persisted block number is {}. Persisted checkpoint number is {}" ,
440+ newBestBlockNumber,
441+ newLatestCheckpointNumber
442+ )
437443
438444 blockHeadersStorage
439445 .remove(blockHash)
440446 .and(blockBodiesStorage.remove(blockHash))
441447 .and(chainWeightStorage.remove(blockHash))
442448 .and(receiptStorage.remove(blockHash))
443- .and(maybeTxList.fold(transactionMappingStorage.emptyBatchUpdate)( removeTxsLocations))
449+ .and(removeTxsLocations(txList ))
444450 .and(blockNumberMappingUpdates)
451+ .and(bestBlockNumberUpdates)
452+ .and(latestCheckpointNumberUpdates)
445453 .commit()
446454
447- // not transactional part
448- saveBestKnownBlocks(newBestBlockNumber, prevCheckpointNumber)
455+ saveBestKnownBlocks(newBestBlockNumber, Some (newLatestCheckpointNumber))
449456 log.debug(
450457 " Removed block with hash {}. New best block number - {}, new best checkpoint block number - {}" ,
451458 ByteStringUtils .hash2string(blockHash),
452459 newBestBlockNumber,
453- prevCheckpointNumber
460+ newLatestCheckpointNumber
454461 )
455462
456- maybeBlockHeader.foreach { h =>
457- if (withState) {
458- val bestBlocksUpdates = appStateStorage
459- .putBestBlockNumber(newBestBlockNumber)
460- .and(checkpointUpdates)
461- stateStorage.onBlockRollback(h.number, bestBlockNumber) { () =>
462- log.debug(
463- " Persisting block info data into database. Persisted block number is {}. " +
464- " Persisted checkpoint number is {}" ,
465- newBestBlockNumber,
466- prevCheckpointNumber
467- )
468- bestBlocksUpdates.commit()
469- }
470- }
471- }
463+ // not transactional part
464+ if (withState)
465+ stateStorage.onBlockRollback(block.number, bestBlockNumber) { () => persistBestBlocksData() }
472466 }
473467 // scalastyle:on method.length
474468
@@ -485,7 +479,7 @@ class BlockchainImpl(
485479 private def findPreviousCheckpointBlockNumber (
486480 blockNumberToCheck : BigInt ,
487481 latestCheckpointBlockNumber : BigInt
488- ): Option [ BigInt ] = {
482+ ): BigInt = {
489483 if (blockNumberToCheck > 0 ) {
490484 val maybePreviousCheckpointBlockNumber = for {
491485 currentBlock <- getBlockByNumber(blockNumberToCheck)
@@ -494,10 +488,10 @@ class BlockchainImpl(
494488 } yield currentBlock.number
495489
496490 maybePreviousCheckpointBlockNumber match {
497- case Some (_ ) => maybePreviousCheckpointBlockNumber
491+ case Some (previousCheckpointBlockNumber ) => previousCheckpointBlockNumber
498492 case None => findPreviousCheckpointBlockNumber(blockNumberToCheck - 1 , latestCheckpointBlockNumber)
499493 }
500- } else None
494+ } else 0
501495 }
502496
503497 private def saveTxsLocations (blockHash : ByteString , blockBody : BlockBody ): DataSourceBatchUpdate =
@@ -549,13 +543,6 @@ class BlockchainImpl(
549543 noEmptyAccounts = noEmptyAccounts,
550544 ethCompatibleStorage = ethCompatibleStorage
551545 )
552-
553- // FIXME EC-495 this method should not be need when best block is handled properly during rollback
554- def persistCachedNodes (): Unit = {
555- if (stateStorage.forcePersist(RollBackFlush )) {
556- persistBestBlocksData()
557- }
558- }
559546}
560547
561548trait BlockchainStorages {
0 commit comments