diff --git a/build.sbt b/build.sbt index 5fc5d9550c..6632d5e363 100644 --- a/build.sbt +++ b/build.sbt @@ -111,24 +111,26 @@ lazy val node = { Seq( Dependencies.akka, Dependencies.akkaHttp, - Dependencies.json4s, - Dependencies.circe, + Dependencies.apacheCommons, Dependencies.boopickle, - Dependencies.rocksDb, - Dependencies.enumeratum, - Dependencies.testing, Dependencies.cats, - Dependencies.monix, - Dependencies.network, + Dependencies.circe, + Dependencies.cli, Dependencies.crypto, - Dependencies.scopt, + Dependencies.dependencies, + Dependencies.enumeratum, + Dependencies.guava, + Dependencies.json4s, + Dependencies.kamon, Dependencies.logging, - Dependencies.apacheCommons, Dependencies.micrometer, - Dependencies.kamon, + Dependencies.monix, + Dependencies.network, Dependencies.prometheus, - Dependencies.cli, - Dependencies.dependencies + Dependencies.rocksDb, + Dependencies.scaffeine, + Dependencies.scopt, + Dependencies.testing ).flatten ++ malletDeps } diff --git a/nix/pkgs/mantis.nix b/nix/pkgs/mantis.nix index 78186c85e8..acfe657ed3 100644 --- a/nix/pkgs/mantis.nix +++ b/nix/pkgs/mantis.nix @@ -50,7 +50,7 @@ in sbt.mkDerivation { # This sha represents the change dependencies of mantis. # Update this sha whenever you change the dependencies - depsSha256 = "0gppwz6dvligrrgjmramyrm9723pwhg89cqfpxj22z2d86brwas2"; + depsSha256 = "07iixw8va4zwpiln2zy2gr245z1ir4jd6pqgmkzhwnhw3mf5j28k"; # this is the command used to to create the fixed-output-derivation depsWarmupCommand = "sbt compile --debug -Dnix=true"; diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7b165ae0b1..82aa858d85 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -112,12 +112,19 @@ object Dependencies { jline, "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2", "org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0", - "com.google.guava" % "guava" % "29.0-jre", "org.xerial.snappy" % "snappy-java" % "1.1.7.7", "org.web3j" % "core" % "5.0.0" % Test, "io.vavr" % "vavr" % "1.0.0-alpha-3" ) + val guava: Seq[ModuleID] = { + val version = "30.1-jre" + Seq( + "com.google.guava" % "guava" % version, + "com.google.guava" % "guava-testlib" % version % "test" + ) + } + val prometheus: Seq[ModuleID] = { val provider = "io.prometheus" val version = "0.9.0" @@ -137,7 +144,7 @@ object Dependencies { "com.google.code.findbugs" % "jsr305" % "3.0.2" % Optional, provider % "micrometer-core" % version, provider % "micrometer-registry-jmx" % version, - provider % "micrometer-registry-prometheus" % version, + provider % "micrometer-registry-prometheus" % version ) } @@ -153,4 +160,9 @@ object Dependencies { val shapeless: Seq[ModuleID] = Seq( "com.chuusai" %% "shapeless" % "2.3.3" ) + + val scaffeine: Seq[ModuleID] = Seq( + "com.github.blemale" %% "scaffeine" % "4.0.2" % "compile" + ) + } diff --git a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala index f8059f7115..e563286b24 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala @@ -17,6 +17,7 @@ import monix.eval.Task import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.Try +import io.iohk.ethereum.blockchain.sync.CacheBasedBlacklist object FastSyncItSpecUtils { class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig) @@ -24,6 +25,9 @@ object FastSyncItSpecUtils { lazy val validators = new MockValidatorsAlwaysSucceed + val maxSize = 1000 + val blacklist = CacheBasedBlacklist.empty(maxSize) + lazy val fastSync = system.actorOf( FastSync.props( storagesInstance.storages.fastSyncStateStorage, @@ -32,6 +36,7 @@ object FastSyncItSpecUtils { validators, peerEventBus, etcPeerManager, + blacklist, testSyncConfig, system.scheduler ) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala new file mode 100644 index 0000000000..8aa0f568a5 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -0,0 +1,161 @@ +package io.iohk.ethereum.blockchain.sync + +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.blemale.scaffeine.{Cache, Scaffeine} +import io.iohk.ethereum.utils.Logger + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ +import scala.jdk.DurationConverters._ + +import Blacklist._ +import io.iohk.ethereum.network.PeerId +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType.WrongBlockHeadersType +import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockError +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType + +trait Blacklist { + def isBlacklisted(id: BlacklistId): Boolean + def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit + def remove(id: BlacklistId): Unit + def keys: Set[BlacklistId] +} + +object Blacklist { + import BlacklistReason._ + import BlacklistReasonType._ + + trait BlacklistId { + def value: String + } + + sealed trait BlacklistReason { + def reasonType: BlacklistReasonType + def description: String + } + object BlacklistReason { + sealed trait BlacklistReasonType { + def code: Int + def name: String + } + object BlacklistReasonType { + case object WrongBlockHeadersType extends BlacklistReasonType { + val code: Int = 1 + val name: String = "WrongBlockHeadersType" + } + case object BlockHeaderValidationFailedType extends BlacklistReasonType { + val code: Int = 2 + val name: String = "BlockHeaderValidationFailed" + } + case object ErrorInBlockHeadersType extends BlacklistReasonType { + val code: Int = 3 + val name: String = "ErrorInBlockHeaders" + } + case object EmptyBlockBodiesType extends BlacklistReasonType { + val code: Int = 4 + val name: String = "EmptyBlockBodies" + } + case object BlockBodiesNotMatchingHeadersType extends BlacklistReasonType { + val code: Int = 5 + val name: String = "BlockBodiesNotMatchingHeaders" + } + case object EmptyReceiptsType extends BlacklistReasonType { + val code: Int = 6 + val name: String = "EmptyReceipts" + } + case object InvalidReceiptsType extends BlacklistReasonType { + val code: Int = 7 + val name: String = "InvalidReceipts" + } + case object RequestFailedType extends BlacklistReasonType { + val code: Int = 8 + val name: String = "RequestFailed" + } + case object PeerActorTerminatedType extends BlacklistReasonType { + val code: Int = 9 + val name: String = "PeerActorTerminated" + } + } + + case object WrongBlockHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = WrongBlockHeadersType + val description: String = "Wrong blockheaders response (empty or not chain forming)" + } + case object BlockHeaderValidationFailed extends BlacklistReason { + val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType + val description: String = "Block header validation failed" + } + case object ErrorInBlockHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = ErrorInBlockHeadersType + val description: String = "Error in block headers response" + } + final case class EmptyBlockBodies(knownHashes: Seq[String]) extends BlacklistReason { + val reasonType: BlacklistReasonType = EmptyBlockBodiesType + val description: String = s"Got empty block bodies response for known hashes: $knownHashes" + } + case object BlockBodiesNotMatchingHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = BlockBodiesNotMatchingHeadersType + val description = "Block bodies not matching block headers" + } + final case class EmptyReceipts(knownHashes: Seq[String]) extends BlacklistReason { + val reasonType: BlacklistReasonType = EmptyReceiptsType + val description: String = s"Got empty receipts for known hashes: $knownHashes" + } + final case class InvalidReceipts(knownHashes: Seq[String], error: BlockError) extends BlacklistReason { + val reasonType: BlacklistReasonType = InvalidReceiptsType + val description: String = s"Got invalid receipts for known hashes: $knownHashes due to: $error" + } + final case class RequestFailed(error: String) extends BlacklistReason { + val reasonType: BlacklistReasonType = RequestFailedType + val description: String = s"Request failed with error: $error" + } + case object PeerActorTerminated extends BlacklistReason { + val reasonType: BlacklistReasonType = PeerActorTerminatedType + val description: String = "Peer actor terminated" + } + } +} + +final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonType]) extends Blacklist with Logger { + + import CacheBasedBlacklist._ + + override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined + + override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = { + log.info("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description) + cache.policy().expireVariably().toScala match { + case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava) + case None => + log.warn(customExpirationError(id)) + cache.put(id, reason.reasonType) + } + } + override def remove(id: BlacklistId): Unit = cache.invalidate(id) + + override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet +} + +object CacheBasedBlacklist { + + def customExpirationError(id: BlacklistId): String = + s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration." + + def empty(maxSize: Int): CacheBasedBlacklist = { + val cache = + Scaffeine() + .expireAfter[BlacklistId, BlacklistReasonType]( + create = (_, _) => 60.minutes, + update = (_, _, _) => 60.minutes, + read = (_, _, duration) => duration // read access should not change the expiration time + ) // required to enable VarExpiration policy (i.e. set custom expiration time per element) + .maximumSize( + maxSize + ) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency + .build[BlacklistId, BlacklistReasonType]() + CacheBasedBlacklist(cache) + } + +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala index 6ab85e42db..12adcba0ff 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala @@ -1,10 +1,13 @@ package io.iohk.ethereum.blockchain.sync -import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler} +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId + import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{Duration, FiniteDuration} +// will be removed once regular sync is switched to new blacklist implementation trait BlacklistSupport { selfActor: Actor with ActorLogging => @@ -14,9 +17,9 @@ trait BlacklistSupport { protected val maxBlacklistedNodes = 1000 - val blacklistedPeers = mutable.LinkedHashMap.empty[BlackListId, Cancellable] + val blacklistedPeers = mutable.LinkedHashMap.empty[BlacklistId, Cancellable] - def blacklist(blacklistId: BlackListId, duration: FiniteDuration, reason: String): Unit = { + def blacklist(blacklistId: BlacklistId, duration: FiniteDuration, reason: String): Unit = { if (duration > Duration.Zero) { if (blacklistedPeers.size >= maxBlacklistedNodes) { removeOldestPeer() @@ -30,13 +33,13 @@ trait BlacklistSupport { } } - def undoBlacklist(blacklistId: BlackListId): Unit = { + def undoBlacklist(blacklistId: BlacklistId): Unit = { val peer = blacklistedPeers.get(blacklistId) peer.foreach(_.cancel()) blacklistedPeers.remove(blacklistId) } - def isBlacklisted(blacklistId: BlackListId): Boolean = + def isBlacklisted(blacklistId: BlacklistId): Boolean = blacklistedPeers.exists(_._1 == blacklistId) def handleBlacklistMessages: Receive = { case UnblacklistPeer(ref) => @@ -52,9 +55,6 @@ trait BlacklistSupport { object BlacklistSupport { - abstract class BlackListId { - def value: String - } + private case class UnblacklistPeer(blacklistId: BlacklistId) - private case class UnblacklistPeer(blacklistId: BlackListId) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala index c5074f9fa8..25991899f5 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala @@ -11,6 +11,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +// will be removed once regular sync is switched to new blacklist/peerlist implementation trait PeerListSupport { self: Actor with ActorLogging with BlacklistSupport => import PeerListSupport._ diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala new file mode 100644 index 0000000000..0bca267ce1 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala @@ -0,0 +1,72 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.{Actor, ActorLogging, ActorRef, Scheduler} +import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo +import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected +import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerDisconnectedClassifier +import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe} +import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId} +import io.iohk.ethereum.utils.Config.SyncConfig + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +trait PeerListSupportNg { self: Actor with ActorLogging => + import PeerListSupportNg._ + import Blacklist._ + + private implicit val ec: ExecutionContext = context.dispatcher + + def etcPeerManager: ActorRef + def peerEventBus: ActorRef + def blacklist: Blacklist + def syncConfig: SyncConfig + def scheduler: Scheduler + + protected var handshakedPeers: Map[PeerId, PeerWithInfo] = Map.empty + + scheduler.scheduleWithFixedDelay( + 0.seconds, + syncConfig.peersScanInterval, + etcPeerManager, + EtcPeerManagerActor.GetHandshakedPeers + )(ec, context.self) + + def handlePeerListMessages: Receive = { + case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers) + case PeerDisconnected(peerId) => removePeerById(peerId) + } + + def peersToDownloadFrom: Map[PeerId, PeerWithInfo] = + handshakedPeers.filterNot { case (peerId, _) => + blacklist.isBlacklisted(peerId) + } + + def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer) + + def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit = + handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason)) + + private def updatePeers(peers: Map[Peer, PeerInfo]): Unit = { + val updated = peers.map { case (peer, peerInfo) => + (peer.id, PeerWithInfo(peer, peerInfo)) + } + updated.filterNot(p => handshakedPeers.keySet.contains(p._1)).foreach { case (peerId, _) => + peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) + } + handshakedPeers = updated + } + + private def removePeerById(peerId: PeerId): Unit = { + if (handshakedPeers.keySet.contains(peerId)) { + peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) + blacklist.remove(peerId) + handshakedPeers = handshakedPeers - peerId + } + } + +} + +object PeerListSupportNg { + final case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo) +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala index 8c53b4df82..2d93223b38 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala @@ -26,6 +26,9 @@ class SyncController( ) extends Actor with ActorLogging { + private val blacklistSize: Int = 1000 // TODO ETCM-642 move to config + private val blacklist: Blacklist = CacheBasedBlacklist.empty(blacklistSize) + def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler override def receive: Receive = idle @@ -81,6 +84,7 @@ class SyncController( validators, peerEventBus, etcPeerManager, + blacklist, syncConfig, scheduler ), diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index c54c52a0c4..f9301c6be4 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -4,9 +4,12 @@ import java.time.Instant import akka.actor._ import akka.util.ByteString import cats.data.NonEmptyList +import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress import io.iohk.ethereum.blockchain.sync._ +import io.iohk.ethereum.blockchain.sync.Blacklist._ +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason._ import io.iohk.ethereum.blockchain.sync.fast.ReceiptsValidator.ReceiptsValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncBlocksValidator.BlockBodyValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.{ @@ -20,7 +23,7 @@ import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage} import io.iohk.ethereum.domain._ import io.iohk.ethereum.mpt.MerklePatriciaTrie import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo -import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.{Peer, PeerId} import io.iohk.ethereum.network.p2p.messages.Codes import io.iohk.ethereum.network.p2p.messages.PV62._ import io.iohk.ethereum.network.p2p.messages.PV63._ @@ -42,12 +45,12 @@ class FastSync( val validators: Validators, val peerEventBus: ActorRef, val etcPeerManager: ActorRef, + val blacklist: Blacklist, val syncConfig: SyncConfig, implicit val scheduler: Scheduler ) extends Actor with ActorLogging - with PeerListSupport - with BlacklistSupport + with PeerListSupportNg with ReceiptsValidator with SyncBlocksValidator { @@ -58,9 +61,7 @@ class FastSync( override def receive: Receive = idle - def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages - - def idle: Receive = handleCommonMessages orElse { + def idle: Receive = handlePeerListMessages orElse { case SyncProtocol.Start => start() case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing } @@ -89,7 +90,7 @@ class FastSync( context become waitingForPivotBlock } - def waitingForPivotBlock: Receive = handleCommonMessages orElse { + def waitingForPivotBlock: Receive = handlePeerListMessages orElse { case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing case PivotBlockSelector.Result(pivotBlockHeader) => if (pivotBlockHeader.number < 1) { @@ -157,10 +158,10 @@ class FastSync( def handleStatus: Receive = { case SyncProtocol.GetStatus => sender() ! currentSyncingStatus case SyncStateSchedulerActor.StateSyncStats(saved, missing) => - syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = (saved + missing)) + syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing) } - def receive: Receive = handleCommonMessages orElse handleStatus orElse { + def receive: Receive = handlePeerListMessages orElse handleStatus orElse { case UpdatePivotBlock(reason) => updatePivotBlock(reason) case WaitingForNewTargetBlock => log.info("State sync stopped until receiving new pivot block") @@ -184,7 +185,7 @@ class FastSync( ) handleBlockHeaders(peer, blockHeaders) else - blacklist(peer.id, blacklistDuration, "wrong blockheaders response (empty or not chain forming)") + blacklist.add(peer.id, blacklistDuration, WrongBlockHeaders) } case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) => @@ -206,10 +207,10 @@ class FastSync( handleReceipts(peer, requestedHashes, receipts) case PeerRequestHandler.RequestFailed(peer, reason) => - handleRequestFailure(peer, sender(), reason) + handleRequestFailure(peer, sender(), RequestFailed(reason)) case Terminated(ref) if assignedHandlers.contains(ref) => - handleRequestFailure(assignedHandlers(ref), ref, "Unexpected error") + handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated) } def askForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Unit = { @@ -247,7 +248,7 @@ class FastSync( } def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive = - handleCommonMessages orElse handleStatus orElse { + handlePeerListMessages orElse handleStatus orElse { case PivotBlockSelector.Result(pivotBlockHeader) if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) => log.info("New pivot block with number {} received", pivotBlockHeader.number) @@ -424,7 +425,7 @@ class FastSync( } private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = { - blacklist(peer.id, duration, "block header validation failed") + blacklist.add(peer.id, duration, BlockHeaderValidationFailed) if (header.number <= syncState.safeDownloadTarget) { discardLastBlocks(header.number, N) syncState = syncState.updateDiscardedBlocks(header, N) @@ -438,7 +439,7 @@ class FastSync( } } - private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = { + private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]): Unit = { if (checkHeadersChain(headers)) { processHeaders(peer, headers) match { case ParentChainWeightNotFound(header) => @@ -462,27 +463,22 @@ class FastSync( ) } } else { - blacklist(peer.id, blacklistDuration, "error in block headers response") + blacklist.add(peer.id, blacklistDuration, ErrorInBlockHeaders) processSyncing() } } - private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]) = { + private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Unit = { if (blockBodies.isEmpty) { - val reason = - s"got empty block bodies response for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(ByteStringUtils.hash2string) + blacklist.add(peer.id, blacklistDuration, EmptyBlockBodies(knownHashes)) syncState = syncState.enqueueBlockBodies(requestedHashes) } else { validateBlocks(requestedHashes, blockBodies) match { case BlockBodyValidationResult.Valid => insertBlocks(requestedHashes, blockBodies) case BlockBodyValidationResult.Invalid => - blacklist( - peer.id, - blacklistDuration, - s"responded with block bodies not matching block headers, blacklisting for $blacklistDuration" - ) + blacklist.add(peer.id, blacklistDuration, BlockBodiesNotMatchingHeaders) syncState = syncState.enqueueBlockBodies(requestedHashes) case BlockBodyValidationResult.DbError => redownloadBlockchain() @@ -492,10 +488,10 @@ class FastSync( processSyncing() } - private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]) = { + private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]): Unit = { if (receipts.isEmpty) { - val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(ByteStringUtils.hash2string) + blacklist.add(peer.id, blacklistDuration, EmptyReceipts(knownHashes)) syncState = syncState.enqueueReceipts(requestedHashes) } else { validateReceipts(requestedHashes, receipts) match { @@ -507,7 +503,7 @@ class FastSync( .reduce(_.and(_)) .commit() - val receivedHashes = blockHashesWithReceipts.unzip._1 + val receivedHashes = blockHashesWithReceipts.map(_._1) updateBestBlockIfNeeded(receivedHashes) val remainingReceipts = requestedHashes.drop(receipts.size) @@ -516,10 +512,8 @@ class FastSync( } case ReceiptsValidationResult.Invalid(error) => - val reason = - s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + - s" due to: $error" - blacklist(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(h => Hex.toHexString(h.toArray[Byte])) + blacklist.add(peer.id, blacklistDuration, InvalidReceipts(knownHashes, error)) syncState = syncState.enqueueReceipts(requestedHashes) case ReceiptsValidationResult.DbError => @@ -530,7 +524,7 @@ class FastSync( processSyncing() } - private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String) = { + private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = { removeRequestHandler(handler) syncState = syncState @@ -541,8 +535,8 @@ class FastSync( requestedReceipts = requestedReceipts - handler requestedHeaders -= peer - if (handshakedPeers.contains(peer)) { - blacklist(peer.id, blacklistDuration, reason) + if (handshakedPeers.contains(peer.id)) { + blacklist.add(peer.id, blacklistDuration, reason) } } @@ -566,18 +560,26 @@ class FastSync( ) } - private def printStatus() = { - val formatPeer: (Peer) => String = peer => + private def printStatus(): Unit = { + def formatPeerEntry(entry: PeerWithInfo): String = formatPeer(entry.peer) + def formatPeer(peer: Peer): String = s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}" - log.info(s"""|Block: ${appStateStorage.getBestBlockNumber()}/${syncState.pivotBlock.number}. - |Peers waiting_for_response/connected: ${assignedHandlers.size}/${handshakedPeers.size} (${blacklistedPeers.size} blacklisted). + val blacklistedIds = blacklist.keys + log.info( + s"""|Block: {}/${syncState.pivotBlock.number}. + |Peers waiting_for_response/connected: ${assignedHandlers.size}/${handshakedPeers.size} (${blacklistedIds.size} blacklisted). |State: ${syncState.downloadedNodesCount}/${syncState.totalNodesCount} nodes. - |""".stripMargin.replace("\n", " ")) + |""".stripMargin.replace("\n", " "), + appStateStorage.getBestBlockNumber() + ) log.debug( - s"""|Connection status: connected(${assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", ")})/ - |handshaked(${handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", ")}) - | blacklisted(${blacklistedPeers.map { case (id, _) => id.value }.mkString(", ")}) - |""".stripMargin.replace("\n", " ") + s"""|Connection status: connected({})/ + |handshaked({}) + | blacklisted({}) + |""".stripMargin.replace("\n", " "), + assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", "), + handshakedPeers.values.toList.map(e => formatPeerEntry(e)).sorted.mkString(", "), + blacklistedIds.map(_.value).mkString(", ") ) } @@ -602,12 +604,12 @@ class FastSync( } private def getPeersWithFreshEnoughPivot( - peers: NonEmptyList[(Peer, PeerInfo)], + peers: NonEmptyList[PeerWithInfo], state: SyncState, syncConfig: SyncConfig ): List[(Peer, BigInt)] = { peers.collect { - case (peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) => + case PeerWithInfo(peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) => (peer, info.maxBlockNumber) } } @@ -619,17 +621,17 @@ class FastSync( !(syncState.updatingPivotBlock || stateSyncRestartRequested) def pivotBlockIsStale(): Boolean = { - val currentPeers = peersToDownloadFrom.toList - if (currentPeers.isEmpty) { + val peersWithInfo = peersToDownloadFrom.values.toList + if (peersWithInfo.isEmpty) { false } else { - val peerWithBestBlockInNetwork = currentPeers.maxBy(peerWithNum => peerWithNum._2.maxBlockNumber) + val peerWithBestBlockInNetwork = peersWithInfo.maxBy(_.peerInfo.maxBlockNumber) val bestPossibleTargetDifferenceInNetwork = - (peerWithBestBlockInNetwork._2.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number + (peerWithBestBlockInNetwork.peerInfo.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number val peersWithTooFreshPossiblePivotBlock = - getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(currentPeers), syncState, syncConfig) + getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(peersWithInfo), syncState, syncConfig) if (peersWithTooFreshPossiblePivotBlock.isEmpty) { log.info( @@ -645,7 +647,7 @@ class FastSync( "There are {} peers with possible new pivot block, " + "best known pivot in current peer list has number {}", peersWithTooFreshPossiblePivotBlock.size, - peerWithBestBlockInNetwork._2.maxBlockNumber + peerWithBestBlockInNetwork.peerInfo.maxBlockNumber ) pivotBlockIsStale @@ -708,7 +710,7 @@ class FastSync( .filter(p => peerRequestsTime.get(p.peer).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now))) peers .take(maxConcurrentRequests - assignedHandlers.size) - .sortBy(_.info.maxBlockNumber)(Ordering[BigInt].reverse) + .sortBy(_.peerInfo.maxBlockNumber)(Ordering[BigInt].reverse) .foreach(assignBlockchainWork) } } @@ -796,8 +798,10 @@ class FastSync( peerRequestsTime += (peer -> Instant.now()) } - def unassignedPeers: List[PeerWithInfo] = - (peersToDownloadFrom -- assignedHandlers.values).map(PeerWithInfo.tupled).toList + def unassignedPeers: List[PeerWithInfo] = { + val assignedPeers = assignedHandlers.values.map(_.id).toList + peersToDownloadFrom.removedAll(assignedPeers).values.toList + } def blockchainDataToDownload: Boolean = syncState.blockChainWorkQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget @@ -831,8 +835,6 @@ class FastSync( object FastSync { - case class PeerWithInfo(peer: Peer, info: PeerInfo) - // scalastyle:off parameter.number def props( fastSyncStateStorage: FastSyncStateStorage, @@ -841,6 +843,7 @@ object FastSync { validators: Validators, peerEventBus: ActorRef, etcPeerManager: ActorRef, + blacklist: Blacklist, syncConfig: SyncConfig, scheduler: Scheduler ): Props = @@ -852,6 +855,7 @@ object FastSync { validators, peerEventBus, etcPeerManager, + blacklist, syncConfig, scheduler ) diff --git a/src/main/scala/io/iohk/ethereum/network/Peer.scala b/src/main/scala/io/iohk/ethereum/network/Peer.scala index 3a61bfe5ee..1760dff90b 100644 --- a/src/main/scala/io/iohk/ethereum/network/Peer.scala +++ b/src/main/scala/io/iohk/ethereum/network/Peer.scala @@ -1,18 +1,18 @@ package io.iohk.ethereum.network -import java.net.InetSocketAddress - import akka.actor.ActorRef import akka.util.ByteString -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId + +import java.net.InetSocketAddress -case class PeerId(value: String) extends BlackListId +final case class PeerId(value: String) extends BlacklistId object PeerId { def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name) } -case class Peer( +final case class Peer( remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean, diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala index be3b51f15e..800b4a23fc 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala @@ -3,8 +3,8 @@ package io.iohk.ethereum.network import akka.actor.SupervisorStrategy.Stop import akka.actor._ import akka.util.{ByteString, Timeout} +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId import io.iohk.ethereum.blockchain.sync.BlacklistSupport -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.jsonrpc.AkkaTaskOps.TaskActorOps import io.iohk.ethereum.network.PeerActor.PeerClosedConnection import io.iohk.ethereum.network.PeerActor.Status.Handshaked @@ -521,7 +521,7 @@ object PeerManagerActor { case class OutgoingConnectionAlreadyHandled(uri: URI) extends ConnectionError - case class PeerAddress(value: String) extends BlackListId + case class PeerAddress(value: String) extends BlacklistId case object SchedulePruneIncomingPeers case class PruneIncomingPeers(stats: PeerStatisticsActor.StatsForAll) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala new file mode 100644 index 0000000000..afb8ec5cee --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -0,0 +1,87 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import io.iohk.ethereum.WithActorSystemShutDown +import io.iohk.ethereum.network.PeerId +import org.scalatest.matchers.must.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ +import com.google.common.testing.FakeTicker +import com.github.blemale.scaffeine.Scaffeine +import java.util.concurrent.TimeUnit + +class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { + import Blacklist._ + + private val peer1 = PeerId("1") + private val peer2 = PeerId("2") + private val peer3 = PeerId("3") + private val peer4 = PeerId("4") + private val peer5 = PeerId("5") + + private val reason = BlacklistReason.ErrorInBlockHeaders + private val anotherReason = BlacklistReason.BlockBodiesNotMatchingHeaders + + private def withBlacklist(maxElements: Int)(test: CacheBasedBlacklist => Unit): Unit = { + val blacklist = CacheBasedBlacklist.empty(maxElements) + test(blacklist) + } + + "CacheBasedBlacklist" should { + "add elements and respect max number of elements" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, reason) + blacklist.add(peer3, 1.minute, anotherReason) + blacklist.add(peer4, 1.minute, anotherReason) + blacklist.add(peer5, 1.minute, reason) + blacklist.cache.cleanUp() + val size = blacklist.keys.size + assert(size <= 3 && size > 0) + } + "should expire elements" in { + val maxSize = 10 + val ticker = new FakeTicker() + val cache = Scaffeine() + .expireAfter[BlacklistId, BlacklistReason.BlacklistReasonType]( + create = (_, _) => 60.minutes, + update = (_, _, _) => 60.minutes, + read = (_, _, duration) => duration + ) + .maximumSize( + maxSize + ) + .ticker(ticker.read _) + .build[BlacklistId, BlacklistReason.BlacklistReasonType]() + val blacklist = CacheBasedBlacklist(cache) + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 10.minutes, reason) + blacklist.add(peer3, 3.minutes, anotherReason) + blacklist.add(peer4, 2.minutes, reason) + blacklist.add(peer5, 7.minutes, reason) + blacklist.isBlacklisted(peer2) // just to simulate a read + blacklist.keys // just to simulate a read + ticker.advance(5, TimeUnit.MINUTES) + val expected = Set(peer2, peer5) + blacklist.cache.cleanUp() + blacklist.keys must contain theSameElementsAs expected + } + "check if given key is part of the list" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, anotherReason) + blacklist.add(peer3, 1.minute, reason) + assert(blacklist.isBlacklisted(peer2) === true) + assert(blacklist.isBlacklisted(PeerId("7")) === false) + } + "remove id from blacklist" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, anotherReason) + blacklist.add(peer3, 1.minute, reason) + assert(blacklist.isBlacklisted(peer2) === true) + blacklist.remove(peer2) + assert(blacklist.isBlacklisted(peer2) === false) + } + } + +} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala index ba88045b38..a1ca236ebb 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala @@ -14,6 +14,7 @@ import monix.reactive.Observable import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.blockchain.sync.fast.FastSync import io.iohk.ethereum.domain.ChainWeight +import monix.execution.Scheduler import scala.concurrent.duration.DurationInt @@ -25,8 +26,11 @@ class FastSyncSpec implicit val timeout: Timeout = Timeout(30.seconds) class Fixture extends EphemBlockchainTestSetup with TestSyncConfig with TestSyncPeers { - override implicit lazy val system = self.system - override implicit val scheduler = self.scheduler + override implicit lazy val system: ActorSystem = self.system + override implicit val scheduler: Scheduler = self.scheduler + + val blacklistMaxElems: Int = 100 + val blacklist: CacheBasedBlacklist = CacheBasedBlacklist.empty(blacklistMaxElems) override lazy val syncConfig: SyncConfig = defaultSyncConfig.copy(pivotBlockOffset = 5, fastSyncBlockValidationX = 5, fastSyncThrottle = 1.millis) @@ -70,6 +74,7 @@ class FastSyncSpec validators = validators, peerEventBus = peerEventBus.ref, etcPeerManager = etcPeerManager.ref, + blacklist = blacklist, syncConfig = syncConfig, scheduler = system.scheduler ) diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index e00dd4f057..2a90ef385c 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -310,8 +310,11 @@ class PeerManagerSpec time.advance(360000) // wait till the peer is out of the blacklist - val newRoundDiscoveredNodes = discoveredNodes + Node.fromUri(new java.net.URI( - "enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556")) + val newRoundDiscoveredNodes = discoveredNodes + Node.fromUri( + new java.net.URI( + "enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556" + ) + ) peerManager ! PeerDiscoveryManager.DiscoveredNodesInfo(newRoundDiscoveredNodes)