From 0ad0cd5e0a85a546087633ba942c6f8c50444c4b Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Wed, 10 Feb 2021 20:28:04 +0100 Subject: [PATCH 01/15] First draft of cache-based blacklist --- build.sbt | 1 + project/Dependencies.scala | 7 ++- .../ethereum/blockchain/sync/Blacklist.scala | 47 ++++++++++++++ .../blockchain/sync/PeerListSupportNg.scala | 63 +++++++++++++++++++ .../blockchain/sync/SyncController.scala | 4 ++ .../blockchain/sync/fast/FastSync.scala | 39 ++++++------ 6 files changed, 142 insertions(+), 19 deletions(-) create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala create mode 100644 src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala diff --git a/build.sbt b/build.sbt index 5fc5d9550c..475eedebf9 100644 --- a/build.sbt +++ b/build.sbt @@ -128,6 +128,7 @@ lazy val node = { Dependencies.kamon, Dependencies.prometheus, Dependencies.cli, + Dependencies.scaffeine, Dependencies.dependencies ).flatten ++ malletDeps } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7b165ae0b1..020e8561b8 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -137,7 +137,7 @@ object Dependencies { "com.google.code.findbugs" % "jsr305" % "3.0.2" % Optional, provider % "micrometer-core" % version, provider % "micrometer-registry-jmx" % version, - provider % "micrometer-registry-prometheus" % version, + provider % "micrometer-registry-prometheus" % version ) } @@ -153,4 +153,9 @@ object Dependencies { val shapeless: Seq[ModuleID] = Seq( "com.chuusai" %% "shapeless" % "2.3.3" ) + + val scaffeine: Seq[ModuleID] = Seq( + "com.github.blemale" %% "scaffeine" % "4.0.2" % "compile" + ) + } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala new file mode 100644 index 0000000000..defa67bd90 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -0,0 +1,47 @@ +package io.iohk.ethereum.blockchain.sync + +import com.github.blemale.scaffeine.{Cache, Scaffeine} +import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId +import io.iohk.ethereum.utils.Logger + +import scala.concurrent.duration.FiniteDuration + +import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ +import scala.jdk.DurationConverters._ + +trait Blacklist { + def isBlacklisted(id: BlackListId): Boolean + def add(id: BlackListId, duration: FiniteDuration, reason: String): Unit + def remove(id: BlackListId): Unit + def keys: Set[BlackListId] +} + +final case class CacheBasedBlacklist(cache: Cache[BlackListId, String]) + extends Blacklist with Logger { + + override def isBlacklisted(id: BlackListId): Boolean = cache.getIfPresent(id).isDefined + override def add(id: BlackListId, duration: FiniteDuration, reason: String): Unit = + cache.policy().expireVariably().toScala.fold { + log.warn(s"Unexpected error while adding peer [$id] to blacklist using custom expiration time. Falling back to default expiration.") + cache.put(id, reason) + } { ve => + ve.put(id, reason, duration.toJava) + } + override def remove(id: BlackListId): Unit = cache.invalidate(id) + + override def keys: Set[BlackListId] = cache.underlying.asMap().keySet().asScala.toSet +} + +object CacheBasedBlacklist { + + // TODO check eviction strategy - we want to remove the oldest element if maxSize is reached + def empty(maxSize: Int): CacheBasedBlacklist = { + val cache = + Scaffeine() + .maximumSize(maxSize) + .build[BlackListId, String]() + CacheBasedBlacklist(cache) + } + +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala new file mode 100644 index 0000000000..1666da3432 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala @@ -0,0 +1,63 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.{Actor, ActorLogging, ActorRef, Scheduler} +import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo +import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected +import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerDisconnectedClassifier +import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe} +import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId} +import io.iohk.ethereum.utils.Config.SyncConfig + +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global // FIXME copied from current impl but not sure global EC is a good choice + +trait PeerListSupportNg { + self: Actor with ActorLogging => + import PeerListSupportNg._ + + def etcPeerManager: ActorRef + def peerEventBus: ActorRef + def blacklist: Blacklist + def syncConfig: SyncConfig + def scheduler: Scheduler + + var handshakedPeers: PeersMap = Map.empty + + scheduler.scheduleWithFixedDelay( + 0.seconds, + syncConfig.peersScanInterval, + etcPeerManager, + EtcPeerManagerActor.GetHandshakedPeers + )(global, context.self) + + def removePeer(peerId: PeerId): Unit = { + peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) + handshakedPeers.find(_._1.id == peerId).foreach { case (peer, _) => blacklist.remove(peer.id) } + handshakedPeers = handshakedPeers.filterNot(_._1.id == peerId) + } + + def peersToDownloadFrom: PeersMap = + handshakedPeers.filterNot { case (p, _) => blacklist.isBlacklisted(p.id) } + + def handlePeerListMessages: Receive = { + case EtcPeerManagerActor.HandshakedPeers(peers) => + peers.keys.filterNot(handshakedPeers.contains).foreach { peer => + peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peer.id))) + } + handshakedPeers = peers + + case PeerDisconnected(peerId) if handshakedPeers.exists(_._1.id == peerId) => + removePeer(peerId) + } + + def peerById(peerId: PeerId): Option[Peer] = handshakedPeers collectFirst { + case (peer, _) if peer.id == peerId => peer + } + + def blacklistIfHandshaked(peer: Peer, duration: FiniteDuration, reason: String): Unit = + handshakedPeers.get(peer).foreach(_ => blacklist.add(peer.id, duration, reason)) +} + +object PeerListSupportNg { + type PeersMap = Map[Peer, PeerInfo] +} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala index 8c53b4df82..5a24db3d91 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala @@ -26,6 +26,9 @@ class SyncController( ) extends Actor with ActorLogging { + private val blacklistSize: Int = 100 // TODO move to config + private val blacklist: Blacklist = CacheBasedBlacklist.empty(blacklistSize) + def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler override def receive: Receive = idle @@ -81,6 +84,7 @@ class SyncController( validators, peerEventBus, etcPeerManager, + blacklist, syncConfig, scheduler ), diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index c54c52a0c4..06d5cc1105 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -42,12 +42,12 @@ class FastSync( val validators: Validators, val peerEventBus: ActorRef, val etcPeerManager: ActorRef, + val blacklist: Blacklist, val syncConfig: SyncConfig, implicit val scheduler: Scheduler ) extends Actor with ActorLogging - with PeerListSupport - with BlacklistSupport + with PeerListSupportNg with ReceiptsValidator with SyncBlocksValidator { @@ -58,7 +58,7 @@ class FastSync( override def receive: Receive = idle - def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages + def handleCommonMessages: Receive = handlePeerListMessages def idle: Receive = handleCommonMessages orElse { case SyncProtocol.Start => start() @@ -184,7 +184,7 @@ class FastSync( ) handleBlockHeaders(peer, blockHeaders) else - blacklist(peer.id, blacklistDuration, "wrong blockheaders response (empty or not chain forming)") + blacklist.add(peer.id, blacklistDuration, "wrong blockheaders response (empty or not chain forming)") } case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) => @@ -424,7 +424,7 @@ class FastSync( } private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = { - blacklist(peer.id, duration, "block header validation failed") + blacklist.add(peer.id, duration, "block header validation failed") if (header.number <= syncState.safeDownloadTarget) { discardLastBlocks(header.number, N) syncState = syncState.updateDiscardedBlocks(header, N) @@ -438,7 +438,7 @@ class FastSync( } } - private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = { + private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]): Unit = { if (checkHeadersChain(headers)) { processHeaders(peer, headers) match { case ParentChainWeightNotFound(header) => @@ -462,23 +462,23 @@ class FastSync( ) } } else { - blacklist(peer.id, blacklistDuration, "error in block headers response") + blacklist.add(peer.id, blacklistDuration, "error in block headers response") processSyncing() } } - private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]) = { + private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Unit = { if (blockBodies.isEmpty) { val reason = s"got empty block bodies response for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist(peer.id, blacklistDuration, reason) + blacklist.add(peer.id, blacklistDuration, reason) syncState = syncState.enqueueBlockBodies(requestedHashes) } else { validateBlocks(requestedHashes, blockBodies) match { case BlockBodyValidationResult.Valid => insertBlocks(requestedHashes, blockBodies) case BlockBodyValidationResult.Invalid => - blacklist( + blacklist.add( peer.id, blacklistDuration, s"responded with block bodies not matching block headers, blacklisting for $blacklistDuration" @@ -492,10 +492,10 @@ class FastSync( processSyncing() } - private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]) = { + private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]): Unit = { if (receipts.isEmpty) { val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist(peer.id, blacklistDuration, reason) + blacklist.add(peer.id, blacklistDuration, reason) syncState = syncState.enqueueReceipts(requestedHashes) } else { validateReceipts(requestedHashes, receipts) match { @@ -519,7 +519,7 @@ class FastSync( val reason = s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + s" due to: $error" - blacklist(peer.id, blacklistDuration, reason) + blacklist.add(peer.id, blacklistDuration, reason) syncState = syncState.enqueueReceipts(requestedHashes) case ReceiptsValidationResult.DbError => @@ -530,7 +530,7 @@ class FastSync( processSyncing() } - private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String) = { + private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String): Unit = { removeRequestHandler(handler) syncState = syncState @@ -542,7 +542,7 @@ class FastSync( requestedHeaders -= peer if (handshakedPeers.contains(peer)) { - blacklist(peer.id, blacklistDuration, reason) + blacklist.add(peer.id, blacklistDuration, reason) } } @@ -566,17 +566,18 @@ class FastSync( ) } - private def printStatus() = { + private def printStatus(): Unit = { val formatPeer: (Peer) => String = peer => s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}" + val blacklistedIds = blacklist.keys log.info(s"""|Block: ${appStateStorage.getBestBlockNumber()}/${syncState.pivotBlock.number}. - |Peers waiting_for_response/connected: ${assignedHandlers.size}/${handshakedPeers.size} (${blacklistedPeers.size} blacklisted). + |Peers waiting_for_response/connected: ${assignedHandlers.size}/${handshakedPeers.size} (${blacklistedIds.size} blacklisted). |State: ${syncState.downloadedNodesCount}/${syncState.totalNodesCount} nodes. |""".stripMargin.replace("\n", " ")) log.debug( s"""|Connection status: connected(${assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", ")})/ |handshaked(${handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", ")}) - | blacklisted(${blacklistedPeers.map { case (id, _) => id.value }.mkString(", ")}) + | blacklisted(${blacklistedIds.map(_.value)mkString(", ")}) |""".stripMargin.replace("\n", " ") ) } @@ -841,6 +842,7 @@ object FastSync { validators: Validators, peerEventBus: ActorRef, etcPeerManager: ActorRef, + blacklist: Blacklist, syncConfig: SyncConfig, scheduler: Scheduler ): Props = @@ -852,6 +854,7 @@ object FastSync { validators, peerEventBus, etcPeerManager, + blacklist, syncConfig, scheduler ) From 20b0045b34cc408fdfcb28614e408f73f5272d5e Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Wed, 10 Feb 2021 20:36:04 +0100 Subject: [PATCH 02/15] Fix logging output --- .../io/iohk/ethereum/blockchain/sync/fast/FastSync.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index 06d5cc1105..b4a788abef 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -157,7 +157,7 @@ class FastSync( def handleStatus: Receive = { case SyncProtocol.GetStatus => sender() ! currentSyncingStatus case SyncStateSchedulerActor.StateSyncStats(saved, missing) => - syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = (saved + missing)) + syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing) } def receive: Receive = handleCommonMessages orElse handleStatus orElse { @@ -507,7 +507,7 @@ class FastSync( .reduce(_.and(_)) .commit() - val receivedHashes = blockHashesWithReceipts.unzip._1 + val receivedHashes = blockHashesWithReceipts.map(_._1) updateBestBlockIfNeeded(receivedHashes) val remainingReceipts = requestedHashes.drop(receipts.size) @@ -567,7 +567,7 @@ class FastSync( } private def printStatus(): Unit = { - val formatPeer: (Peer) => String = peer => + val formatPeer: Peer => String = peer => s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}" val blacklistedIds = blacklist.keys log.info(s"""|Block: ${appStateStorage.getBestBlockNumber()}/${syncState.pivotBlock.number}. @@ -577,7 +577,7 @@ class FastSync( log.debug( s"""|Connection status: connected(${assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", ")})/ |handshaked(${handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", ")}) - | blacklisted(${blacklistedIds.map(_.value)mkString(", ")}) + | blacklisted(${blacklistedIds.map(_.value).mkString(", ")}) |""".stripMargin.replace("\n", " ") ) } From 23da5e762d9218e1e2d14dbe511d0372f1c409fa Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Wed, 10 Feb 2021 22:08:26 +0100 Subject: [PATCH 03/15] Fix FastSyncSpec --- .../io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala index ba88045b38..a1ca236ebb 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/FastSyncSpec.scala @@ -14,6 +14,7 @@ import monix.reactive.Observable import io.iohk.ethereum.BlockHelpers import io.iohk.ethereum.blockchain.sync.fast.FastSync import io.iohk.ethereum.domain.ChainWeight +import monix.execution.Scheduler import scala.concurrent.duration.DurationInt @@ -25,8 +26,11 @@ class FastSyncSpec implicit val timeout: Timeout = Timeout(30.seconds) class Fixture extends EphemBlockchainTestSetup with TestSyncConfig with TestSyncPeers { - override implicit lazy val system = self.system - override implicit val scheduler = self.scheduler + override implicit lazy val system: ActorSystem = self.system + override implicit val scheduler: Scheduler = self.scheduler + + val blacklistMaxElems: Int = 100 + val blacklist: CacheBasedBlacklist = CacheBasedBlacklist.empty(blacklistMaxElems) override lazy val syncConfig: SyncConfig = defaultSyncConfig.copy(pivotBlockOffset = 5, fastSyncBlockValidationX = 5, fastSyncThrottle = 1.millis) @@ -70,6 +74,7 @@ class FastSyncSpec validators = validators, peerEventBus = peerEventBus.ref, etcPeerManager = etcPeerManager.ref, + blacklist = blacklist, syncConfig = syncConfig, scheduler = system.scheduler ) From 68b263a28c9b5e007f66427fffd6330a544015e8 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Thu, 11 Feb 2021 07:59:01 +0100 Subject: [PATCH 04/15] Update nix-sbt sha --- nix/pkgs/mantis.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nix/pkgs/mantis.nix b/nix/pkgs/mantis.nix index 78186c85e8..3e83cd3e94 100644 --- a/nix/pkgs/mantis.nix +++ b/nix/pkgs/mantis.nix @@ -50,7 +50,7 @@ in sbt.mkDerivation { # This sha represents the change dependencies of mantis. # Update this sha whenever you change the dependencies - depsSha256 = "0gppwz6dvligrrgjmramyrm9723pwhg89cqfpxj22z2d86brwas2"; + depsSha256 = "03wyskmiy6fsrsmaa4niksiaw8svmf31mv7hh6xczpjbkrrkwfnm"; # this is the command used to to create the fixed-output-derivation depsWarmupCommand = "sbt compile --debug -Dnix=true"; From 29311a4176b2673fe92215432934593f7e263324 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Thu, 11 Feb 2021 22:16:00 +0100 Subject: [PATCH 05/15] Update nix-sbt sha --- nix/pkgs/mantis.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nix/pkgs/mantis.nix b/nix/pkgs/mantis.nix index 3e83cd3e94..acfe657ed3 100644 --- a/nix/pkgs/mantis.nix +++ b/nix/pkgs/mantis.nix @@ -50,7 +50,7 @@ in sbt.mkDerivation { # This sha represents the change dependencies of mantis. # Update this sha whenever you change the dependencies - depsSha256 = "03wyskmiy6fsrsmaa4niksiaw8svmf31mv7hh6xczpjbkrrkwfnm"; + depsSha256 = "07iixw8va4zwpiln2zy2gr245z1ir4jd6pqgmkzhwnhw3mf5j28k"; # this is the command used to to create the fixed-output-derivation depsWarmupCommand = "sbt compile --debug -Dnix=true"; From 72f7da8870e504f5769a99a6c2af4eea157576a0 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Thu, 11 Feb 2021 22:16:33 +0100 Subject: [PATCH 06/15] Polish and add tests --- build.sbt | 25 +++--- project/Dependencies.scala | 9 ++- .../ethereum/blockchain/sync/Blacklist.scala | 16 ++-- .../blockchain/sync/SyncController.scala | 2 +- .../blockchain/sync/fast/FastSync.scala | 18 +++-- .../sync/CacheBasedBlacklistSpec.scala | 81 +++++++++++++++++++ 6 files changed, 125 insertions(+), 26 deletions(-) create mode 100644 src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala diff --git a/build.sbt b/build.sbt index 475eedebf9..6632d5e363 100644 --- a/build.sbt +++ b/build.sbt @@ -111,25 +111,26 @@ lazy val node = { Seq( Dependencies.akka, Dependencies.akkaHttp, - Dependencies.json4s, - Dependencies.circe, + Dependencies.apacheCommons, Dependencies.boopickle, - Dependencies.rocksDb, - Dependencies.enumeratum, - Dependencies.testing, Dependencies.cats, - Dependencies.monix, - Dependencies.network, + Dependencies.circe, + Dependencies.cli, Dependencies.crypto, - Dependencies.scopt, + Dependencies.dependencies, + Dependencies.enumeratum, + Dependencies.guava, + Dependencies.json4s, + Dependencies.kamon, Dependencies.logging, - Dependencies.apacheCommons, Dependencies.micrometer, - Dependencies.kamon, + Dependencies.monix, + Dependencies.network, Dependencies.prometheus, - Dependencies.cli, + Dependencies.rocksDb, Dependencies.scaffeine, - Dependencies.dependencies + Dependencies.scopt, + Dependencies.testing ).flatten ++ malletDeps } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 020e8561b8..82aa858d85 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -112,12 +112,19 @@ object Dependencies { jline, "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2", "org.scala-sbt.ipcsocket" % "ipcsocket" % "1.1.0", - "com.google.guava" % "guava" % "29.0-jre", "org.xerial.snappy" % "snappy-java" % "1.1.7.7", "org.web3j" % "core" % "5.0.0" % Test, "io.vavr" % "vavr" % "1.0.0-alpha-3" ) + val guava: Seq[ModuleID] = { + val version = "30.1-jre" + Seq( + "com.google.guava" % "guava" % version, + "com.google.guava" % "guava-testlib" % version % "test" + ) + } + val prometheus: Seq[ModuleID] = { val provider = "io.prometheus" val version = "0.9.0" diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index defa67bd90..c2248a3db0 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -1,11 +1,12 @@ package io.iohk.ethereum.blockchain.sync +import com.github.benmanes.caffeine.cache.Caffeine import com.github.blemale.scaffeine.{Cache, Scaffeine} import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.utils.Logger import scala.concurrent.duration.FiniteDuration - +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ import scala.jdk.DurationConverters._ @@ -21,12 +22,13 @@ final case class CacheBasedBlacklist(cache: Cache[BlackListId, String]) extends Blacklist with Logger { override def isBlacklisted(id: BlackListId): Boolean = cache.getIfPresent(id).isDefined + override def add(id: BlackListId, duration: FiniteDuration, reason: String): Unit = cache.policy().expireVariably().toScala.fold { - log.warn(s"Unexpected error while adding peer [$id] to blacklist using custom expiration time. Falling back to default expiration.") + log.warn(s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration.") cache.put(id, reason) - } { ve => - ve.put(id, reason, duration.toJava) + } { varExpirationPolicy => + varExpirationPolicy.put(id, reason, duration.toJava) } override def remove(id: BlackListId): Unit = cache.invalidate(id) @@ -35,11 +37,13 @@ final case class CacheBasedBlacklist(cache: Cache[BlackListId, String]) object CacheBasedBlacklist { - // TODO check eviction strategy - we want to remove the oldest element if maxSize is reached def empty(maxSize: Int): CacheBasedBlacklist = { val cache = Scaffeine() - .maximumSize(maxSize) + .expireAfter[BlackListId, String](create = (_, _) => 60.minutes, + update = (_, _, _) => 60.minutes, + read = (_, _, _) => 60.minutes) // required to enable VarExpiration policy (i.e. set custom expiration time per element) + .maximumSize(maxSize) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency .build[BlackListId, String]() CacheBasedBlacklist(cache) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala index 5a24db3d91..2d93223b38 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala @@ -26,7 +26,7 @@ class SyncController( ) extends Actor with ActorLogging { - private val blacklistSize: Int = 100 // TODO move to config + private val blacklistSize: Int = 1000 // TODO ETCM-642 move to config private val blacklist: Blacklist = CacheBasedBlacklist.empty(blacklistSize) def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index b4a788abef..43937588ea 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -570,15 +570,21 @@ class FastSync( val formatPeer: Peer => String = peer => s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}" val blacklistedIds = blacklist.keys - log.info(s"""|Block: ${appStateStorage.getBestBlockNumber()}/${syncState.pivotBlock.number}. + log.info( + s"""|Block: {}/${syncState.pivotBlock.number}. |Peers waiting_for_response/connected: ${assignedHandlers.size}/${handshakedPeers.size} (${blacklistedIds.size} blacklisted). |State: ${syncState.downloadedNodesCount}/${syncState.totalNodesCount} nodes. - |""".stripMargin.replace("\n", " ")) + |""".stripMargin.replace("\n", " "), + appStateStorage.getBestBlockNumber() + ) log.debug( - s"""|Connection status: connected(${assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", ")})/ - |handshaked(${handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", ")}) - | blacklisted(${blacklistedIds.map(_.value).mkString(", ")}) - |""".stripMargin.replace("\n", " ") + s"""|Connection status: connected({})/ + |handshaked({}) + | blacklisted({}) + |""".stripMargin.replace("\n", " "), + assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", "), + handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", "), + blacklistedIds.map(_.value).mkString(", ") ) } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala new file mode 100644 index 0000000000..05de674796 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -0,0 +1,81 @@ +package io.iohk.ethereum.blockchain.sync + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import io.iohk.ethereum.WithActorSystemShutDown +import io.iohk.ethereum.network.PeerId +import org.scalatest.matchers.must.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ +import com.google.common.testing.FakeTicker +import com.github.blemale.scaffeine.Scaffeine +import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId +import java.util.concurrent.TimeUnit + +class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { + + private val peer1 = PeerId("1") + private val peer2 = PeerId("2") + private val peer3 = PeerId("3") + private val peer4 = PeerId("4") + private val peer5 = PeerId("5") + + private def withBlacklist(maxElements: Int)(test: CacheBasedBlacklist => Unit): Unit = { + val blacklist = CacheBasedBlacklist.empty(maxElements) + test(blacklist) + } + + "CacheBasedBlacklist" should { + "add elements and respect max number of elements" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, "just because") + blacklist.add(peer2, 1.minute, "just because") + blacklist.add(peer3, 1.minute, "just because") + blacklist.add(peer4, 1.minute, "just because") + blacklist.add(peer5, 1.minute, "just because") + blacklist.cache.cleanUp() + assert(blacklist.keys.size <= 3) + } + "should expire elements" in { + val maxSize = 10 + val ticker = new FakeTicker() + val cache = Scaffeine() + .expireAfter[BlackListId, String]( + create = (_, _) => 60.minutes, + update = (_, _, _) => 60.minutes, + read = (_, _, _) => 60.minutes + ) + .maximumSize( + maxSize + ) + .ticker(ticker.read _) + .build[BlackListId, String]() + val blacklist = CacheBasedBlacklist(cache) + blacklist.add(peer1, 1.minute, "just because") + blacklist.add(peer2, 10.minutes, "just because") + blacklist.add(peer3, 3.minute, "just because") + blacklist.add(peer4, 2.minute, "just because") + blacklist.add(peer5, 7.minutes, "just because") + ticker.advance(5, TimeUnit.MINUTES) + val expected = Set(peer2, peer5) + blacklist.cache.cleanUp() + blacklist.keys must contain theSameElementsAs expected + } + "check if given key is part of the list" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, "just because") + blacklist.add(peer2, 1.minute, "just because") + blacklist.add(peer3, 1.minute, "just because") + assert(blacklist.isBlacklisted(peer2) === true) + assert(blacklist.isBlacklisted(PeerId("7")) === false) + } + "remove id from blacklist" in withBlacklist(3) { blacklist => + blacklist.add(peer1, 1.minute, "just because") + blacklist.add(peer2, 1.minute, "just because") + blacklist.add(peer3, 1.minute, "just because") + assert(blacklist.isBlacklisted(peer2) === true) + blacklist.remove(peer2) + assert(blacklist.isBlacklisted(peer2) === false) + } + } + +} From e53e760f7b8cf49004ca2288020b44cb8fbd3276 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Thu, 11 Feb 2021 22:36:21 +0100 Subject: [PATCH 07/15] Rename BlackListId to BlacklistId --- .../ethereum/blockchain/sync/Blacklist.scala | 34 ++++++++++++------- .../blockchain/sync/BlacklistSupport.scala | 18 +++++----- .../scala/io/iohk/ethereum/network/Peer.scala | 10 +++--- .../ethereum/network/PeerManagerActor.scala | 4 +-- .../sync/CacheBasedBlacklistSpec.scala | 7 ++-- 5 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index c2248a3db0..0707611abf 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -2,7 +2,6 @@ package io.iohk.ethereum.blockchain.sync import com.github.benmanes.caffeine.cache.Caffeine import com.github.blemale.scaffeine.{Cache, Scaffeine} -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.utils.Logger import scala.concurrent.duration.FiniteDuration @@ -11,28 +10,39 @@ import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ import scala.jdk.DurationConverters._ +import Blacklist.BlacklistId + trait Blacklist { - def isBlacklisted(id: BlackListId): Boolean - def add(id: BlackListId, duration: FiniteDuration, reason: String): Unit - def remove(id: BlackListId): Unit - def keys: Set[BlackListId] + + def isBlacklisted(id: BlacklistId): Boolean + def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit + def remove(id: BlacklistId): Unit + def keys: Set[BlacklistId] +} + +object Blacklist { + + trait BlacklistId { + def value: String + } + } -final case class CacheBasedBlacklist(cache: Cache[BlackListId, String]) +final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) extends Blacklist with Logger { - override def isBlacklisted(id: BlackListId): Boolean = cache.getIfPresent(id).isDefined + override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined - override def add(id: BlackListId, duration: FiniteDuration, reason: String): Unit = + override def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit = cache.policy().expireVariably().toScala.fold { log.warn(s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration.") cache.put(id, reason) } { varExpirationPolicy => varExpirationPolicy.put(id, reason, duration.toJava) } - override def remove(id: BlackListId): Unit = cache.invalidate(id) + override def remove(id: BlacklistId): Unit = cache.invalidate(id) - override def keys: Set[BlackListId] = cache.underlying.asMap().keySet().asScala.toSet + override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet } object CacheBasedBlacklist { @@ -40,11 +50,11 @@ object CacheBasedBlacklist { def empty(maxSize: Int): CacheBasedBlacklist = { val cache = Scaffeine() - .expireAfter[BlackListId, String](create = (_, _) => 60.minutes, + .expireAfter[BlacklistId, String](create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, read = (_, _, _) => 60.minutes) // required to enable VarExpiration policy (i.e. set custom expiration time per element) .maximumSize(maxSize) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency - .build[BlackListId, String]() + .build[BlacklistId, String]() CacheBasedBlacklist(cache) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala index 6ab85e42db..12adcba0ff 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/BlacklistSupport.scala @@ -1,10 +1,13 @@ package io.iohk.ethereum.blockchain.sync -import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler} +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId + import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{Duration, FiniteDuration} +// will be removed once regular sync is switched to new blacklist implementation trait BlacklistSupport { selfActor: Actor with ActorLogging => @@ -14,9 +17,9 @@ trait BlacklistSupport { protected val maxBlacklistedNodes = 1000 - val blacklistedPeers = mutable.LinkedHashMap.empty[BlackListId, Cancellable] + val blacklistedPeers = mutable.LinkedHashMap.empty[BlacklistId, Cancellable] - def blacklist(blacklistId: BlackListId, duration: FiniteDuration, reason: String): Unit = { + def blacklist(blacklistId: BlacklistId, duration: FiniteDuration, reason: String): Unit = { if (duration > Duration.Zero) { if (blacklistedPeers.size >= maxBlacklistedNodes) { removeOldestPeer() @@ -30,13 +33,13 @@ trait BlacklistSupport { } } - def undoBlacklist(blacklistId: BlackListId): Unit = { + def undoBlacklist(blacklistId: BlacklistId): Unit = { val peer = blacklistedPeers.get(blacklistId) peer.foreach(_.cancel()) blacklistedPeers.remove(blacklistId) } - def isBlacklisted(blacklistId: BlackListId): Boolean = + def isBlacklisted(blacklistId: BlacklistId): Boolean = blacklistedPeers.exists(_._1 == blacklistId) def handleBlacklistMessages: Receive = { case UnblacklistPeer(ref) => @@ -52,9 +55,6 @@ trait BlacklistSupport { object BlacklistSupport { - abstract class BlackListId { - def value: String - } + private case class UnblacklistPeer(blacklistId: BlacklistId) - private case class UnblacklistPeer(blacklistId: BlackListId) } diff --git a/src/main/scala/io/iohk/ethereum/network/Peer.scala b/src/main/scala/io/iohk/ethereum/network/Peer.scala index 3a61bfe5ee..1760dff90b 100644 --- a/src/main/scala/io/iohk/ethereum/network/Peer.scala +++ b/src/main/scala/io/iohk/ethereum/network/Peer.scala @@ -1,18 +1,18 @@ package io.iohk.ethereum.network -import java.net.InetSocketAddress - import akka.actor.ActorRef import akka.util.ByteString -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId + +import java.net.InetSocketAddress -case class PeerId(value: String) extends BlackListId +final case class PeerId(value: String) extends BlacklistId object PeerId { def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name) } -case class Peer( +final case class Peer( remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean, diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala index be3b51f15e..800b4a23fc 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala @@ -3,8 +3,8 @@ package io.iohk.ethereum.network import akka.actor.SupervisorStrategy.Stop import akka.actor._ import akka.util.{ByteString, Timeout} +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId import io.iohk.ethereum.blockchain.sync.BlacklistSupport -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId import io.iohk.ethereum.jsonrpc.AkkaTaskOps.TaskActorOps import io.iohk.ethereum.network.PeerActor.PeerClosedConnection import io.iohk.ethereum.network.PeerActor.Status.Handshaked @@ -521,7 +521,7 @@ object PeerManagerActor { case class OutgoingConnectionAlreadyHandled(uri: URI) extends ConnectionError - case class PeerAddress(value: String) extends BlackListId + case class PeerAddress(value: String) extends BlacklistId case object SchedulePruneIncomingPeers case class PruneIncomingPeers(stats: PeerStatisticsActor.StatsForAll) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala index 05de674796..47c3f2e7a5 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -10,7 +10,8 @@ import org.scalatest.wordspec.AnyWordSpecLike import scala.concurrent.duration._ import com.google.common.testing.FakeTicker import com.github.blemale.scaffeine.Scaffeine -import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId + import java.util.concurrent.TimeUnit class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { @@ -40,7 +41,7 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { val maxSize = 10 val ticker = new FakeTicker() val cache = Scaffeine() - .expireAfter[BlackListId, String]( + .expireAfter[BlacklistId, String]( create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, read = (_, _, _) => 60.minutes @@ -49,7 +50,7 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { maxSize ) .ticker(ticker.read _) - .build[BlackListId, String]() + .build[BlacklistId, String]() val blacklist = CacheBasedBlacklist(cache) blacklist.add(peer1, 1.minute, "just because") blacklist.add(peer2, 10.minutes, "just because") From 962fae2d394f41668158f3267741516a7b01f7ba Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Fri, 12 Feb 2021 09:43:20 +0100 Subject: [PATCH 08/15] Rework PeerListSupport a little bit --- .../ethereum/blockchain/sync/Blacklist.scala | 3 - .../blockchain/sync/PeerListSupport.scala | 1 + .../blockchain/sync/PeerListSupportNg.scala | 58 +++++++++++-------- .../blockchain/sync/fast/FastSync.scala | 36 ++++++------ 4 files changed, 53 insertions(+), 45 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index 0707611abf..18d9430351 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -13,7 +13,6 @@ import scala.jdk.DurationConverters._ import Blacklist.BlacklistId trait Blacklist { - def isBlacklisted(id: BlacklistId): Boolean def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit def remove(id: BlacklistId): Unit @@ -21,11 +20,9 @@ trait Blacklist { } object Blacklist { - trait BlacklistId { def value: String } - } final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala index c5074f9fa8..25991899f5 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala @@ -11,6 +11,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +// will be removed once regular sync is switched to new blacklist/peerlist implementation trait PeerListSupport { self: Actor with ActorLogging with BlacklistSupport => import PeerListSupport._ diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala index 1666da3432..29ef214826 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala @@ -8,56 +8,64 @@ import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsu import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId} import io.iohk.ethereum.utils.Config.SyncConfig +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global // FIXME copied from current impl but not sure global EC is a good choice -trait PeerListSupportNg { - self: Actor with ActorLogging => +trait PeerListSupportNg { self: Actor with ActorLogging => import PeerListSupportNg._ + private implicit val ec: ExecutionContext = context.dispatcher + def etcPeerManager: ActorRef def peerEventBus: ActorRef def blacklist: Blacklist def syncConfig: SyncConfig def scheduler: Scheduler - var handshakedPeers: PeersMap = Map.empty + protected var handshakedPeers: Map[PeerId, PeerWithInfo] = Map.empty scheduler.scheduleWithFixedDelay( 0.seconds, syncConfig.peersScanInterval, etcPeerManager, EtcPeerManagerActor.GetHandshakedPeers - )(global, context.self) + ) - def removePeer(peerId: PeerId): Unit = { - peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) - handshakedPeers.find(_._1.id == peerId).foreach { case (peer, _) => blacklist.remove(peer.id) } - handshakedPeers = handshakedPeers.filterNot(_._1.id == peerId) + def handlePeerListMessages: Receive = { + case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers) + case PeerDisconnected(peerId) => removePeer(peerId) } - def peersToDownloadFrom: PeersMap = - handshakedPeers.filterNot { case (p, _) => blacklist.isBlacklisted(p.id) } + def peersToDownloadFrom: Map[PeerId, PeerWithInfo] = + handshakedPeers.filterNot { case (peerId, _) => + blacklist.isBlacklisted(peerId) + } - def handlePeerListMessages: Receive = { - case EtcPeerManagerActor.HandshakedPeers(peers) => - peers.keys.filterNot(handshakedPeers.contains).foreach { peer => - peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peer.id))) - } - handshakedPeers = peers - - case PeerDisconnected(peerId) if handshakedPeers.exists(_._1.id == peerId) => - removePeer(peerId) + def peerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer) + + def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: String): Unit = + handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason)) + + private def updatePeers(peers: Map[Peer, PeerInfo]): Unit = { + val updated = peers.map { case (peer, peerInfo) => + (peer.id, PeerWithInfo(peer, peerInfo)) + } + updated.filterNot(p => handshakedPeers.keySet.contains(p._1)).foreach { case (peerId, _) => + peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) + } + handshakedPeers = updated } - def peerById(peerId: PeerId): Option[Peer] = handshakedPeers collectFirst { - case (peer, _) if peer.id == peerId => peer + private def removePeer(peerId: PeerId): Unit = { + if (handshakedPeers.keySet.contains(peerId)) { + peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) + blacklist.remove(peerId) + handshakedPeers = handshakedPeers - peerId + } } - def blacklistIfHandshaked(peer: Peer, duration: FiniteDuration, reason: String): Unit = - handshakedPeers.get(peer).foreach(_ => blacklist.add(peer.id, duration, reason)) } object PeerListSupportNg { - type PeersMap = Map[Peer, PeerInfo] + final case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index 43937588ea..bc5017ac4f 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -4,6 +4,7 @@ import java.time.Instant import akka.actor._ import akka.util.ByteString import cats.data.NonEmptyList +import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress import io.iohk.ethereum.blockchain.sync._ @@ -20,7 +21,7 @@ import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage} import io.iohk.ethereum.domain._ import io.iohk.ethereum.mpt.MerklePatriciaTrie import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo -import io.iohk.ethereum.network.Peer +import io.iohk.ethereum.network.{Peer, PeerId} import io.iohk.ethereum.network.p2p.messages.Codes import io.iohk.ethereum.network.p2p.messages.PV62._ import io.iohk.ethereum.network.p2p.messages.PV63._ @@ -541,7 +542,7 @@ class FastSync( requestedReceipts = requestedReceipts - handler requestedHeaders -= peer - if (handshakedPeers.contains(peer)) { + if (handshakedPeers.contains(peer.id)) { blacklist.add(peer.id, blacklistDuration, reason) } } @@ -567,7 +568,8 @@ class FastSync( } private def printStatus(): Unit = { - val formatPeer: Peer => String = peer => + def formatPeerEntry(entry: PeerWithInfo): String = formatPeer(entry.peer) + def formatPeer(peer: Peer): String = s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}" val blacklistedIds = blacklist.keys log.info( @@ -583,7 +585,7 @@ class FastSync( | blacklisted({}) |""".stripMargin.replace("\n", " "), assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", "), - handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", "), + handshakedPeers.values.toList.map(e => formatPeerEntry(e)).sorted.mkString(", "), blacklistedIds.map(_.value).mkString(", ") ) } @@ -609,12 +611,12 @@ class FastSync( } private def getPeersWithFreshEnoughPivot( - peers: NonEmptyList[(Peer, PeerInfo)], + peers: NonEmptyList[PeerWithInfo], state: SyncState, syncConfig: SyncConfig ): List[(Peer, BigInt)] = { peers.collect { - case (peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) => + case PeerWithInfo(peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) => (peer, info.maxBlockNumber) } } @@ -626,17 +628,17 @@ class FastSync( !(syncState.updatingPivotBlock || stateSyncRestartRequested) def pivotBlockIsStale(): Boolean = { - val currentPeers = peersToDownloadFrom.toList - if (currentPeers.isEmpty) { + val peersWithInfo = peersToDownloadFrom.values.toList + if (peersWithInfo.isEmpty) { false } else { - val peerWithBestBlockInNetwork = currentPeers.maxBy(peerWithNum => peerWithNum._2.maxBlockNumber) + val peerWithBestBlockInNetwork = peersWithInfo.maxBy(_.peerInfo.maxBlockNumber) val bestPossibleTargetDifferenceInNetwork = - (peerWithBestBlockInNetwork._2.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number + (peerWithBestBlockInNetwork.peerInfo.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number val peersWithTooFreshPossiblePivotBlock = - getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(currentPeers), syncState, syncConfig) + getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(peersWithInfo), syncState, syncConfig) if (peersWithTooFreshPossiblePivotBlock.isEmpty) { log.info( @@ -652,7 +654,7 @@ class FastSync( "There are {} peers with possible new pivot block, " + "best known pivot in current peer list has number {}", peersWithTooFreshPossiblePivotBlock.size, - peerWithBestBlockInNetwork._2.maxBlockNumber + peerWithBestBlockInNetwork.peerInfo.maxBlockNumber ) pivotBlockIsStale @@ -715,7 +717,7 @@ class FastSync( .filter(p => peerRequestsTime.get(p.peer).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now))) peers .take(maxConcurrentRequests - assignedHandlers.size) - .sortBy(_.info.maxBlockNumber)(Ordering[BigInt].reverse) + .sortBy(_.peerInfo.maxBlockNumber)(Ordering[BigInt].reverse) .foreach(assignBlockchainWork) } } @@ -803,8 +805,10 @@ class FastSync( peerRequestsTime += (peer -> Instant.now()) } - def unassignedPeers: List[PeerWithInfo] = - (peersToDownloadFrom -- assignedHandlers.values).map(PeerWithInfo.tupled).toList + def unassignedPeers: List[PeerWithInfo] = { + val assignedPeers = assignedHandlers.values.map(_.id).toList + peersToDownloadFrom.removedAll(assignedPeers).values.toList + } def blockchainDataToDownload: Boolean = syncState.blockChainWorkQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget @@ -838,8 +842,6 @@ class FastSync( object FastSync { - case class PeerWithInfo(peer: Peer, info: PeerInfo) - // scalastyle:off parameter.number def props( fastSyncStateStorage: FastSyncStateStorage, From 93d85f12bc5f0b29aa9f8e7a5621a04bc51da95d Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Fri, 12 Feb 2021 16:44:10 +0100 Subject: [PATCH 09/15] Small cleanup --- .../ethereum/blockchain/sync/Blacklist.scala | 31 ++++++++++++------- .../blockchain/sync/fast/FastSync.scala | 10 +++--- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index 18d9430351..d171b3d257 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -25,18 +25,23 @@ object Blacklist { } } -final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) - extends Blacklist with Logger { +final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) extends Blacklist with Logger { override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined override def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit = - cache.policy().expireVariably().toScala.fold { - log.warn(s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration.") - cache.put(id, reason) - } { varExpirationPolicy => - varExpirationPolicy.put(id, reason, duration.toJava) - } + cache + .policy() + .expireVariably() + .toScala + .fold { + log.warn( + s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration." + ) + cache.put(id, reason) + } { varExpirationPolicy => + varExpirationPolicy.put(id, reason, duration.toJava) + } override def remove(id: BlacklistId): Unit = cache.invalidate(id) override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet @@ -47,10 +52,14 @@ object CacheBasedBlacklist { def empty(maxSize: Int): CacheBasedBlacklist = { val cache = Scaffeine() - .expireAfter[BlacklistId, String](create = (_, _) => 60.minutes, + .expireAfter[BlacklistId, String]( + create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, - read = (_, _, _) => 60.minutes) // required to enable VarExpiration policy (i.e. set custom expiration time per element) - .maximumSize(maxSize) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency + read = (_, _, _) => 60.minutes + ) // required to enable VarExpiration policy (i.e. set custom expiration time per element) + .maximumSize( + maxSize + ) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency .build[BlacklistId, String]() CacheBasedBlacklist(cache) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index bc5017ac4f..77fc399aff 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -59,9 +59,7 @@ class FastSync( override def receive: Receive = idle - def handleCommonMessages: Receive = handlePeerListMessages - - def idle: Receive = handleCommonMessages orElse { + def idle: Receive = handlePeerListMessages orElse { case SyncProtocol.Start => start() case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing } @@ -90,7 +88,7 @@ class FastSync( context become waitingForPivotBlock } - def waitingForPivotBlock: Receive = handleCommonMessages orElse { + def waitingForPivotBlock: Receive = handlePeerListMessages orElse { case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing case PivotBlockSelector.Result(pivotBlockHeader) => if (pivotBlockHeader.number < 1) { @@ -161,7 +159,7 @@ class FastSync( syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing) } - def receive: Receive = handleCommonMessages orElse handleStatus orElse { + def receive: Receive = handlePeerListMessages orElse handleStatus orElse { case UpdatePivotBlock(reason) => updatePivotBlock(reason) case WaitingForNewTargetBlock => log.info("State sync stopped until receiving new pivot block") @@ -248,7 +246,7 @@ class FastSync( } def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive = - handleCommonMessages orElse handleStatus orElse { + handlePeerListMessages orElse handleStatus orElse { case PivotBlockSelector.Result(pivotBlockHeader) if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) => log.info("New pivot block with number {} received", pivotBlockHeader.number) From c5dd35c298462b1a964e5ea54465955c05d5759b Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Sun, 14 Feb 2021 22:02:45 +0100 Subject: [PATCH 10/15] [ETCM-531] Turn blacklist reasons into proper types, small improvements based on PR comments --- .../sync/util/FastSyncItSpecUtils.scala | 5 + .../ethereum/blockchain/sync/Blacklist.scala | 130 +++++++++++++++--- .../blockchain/sync/PeerListSupportNg.scala | 9 +- .../blockchain/sync/fast/FastSync.scala | 35 ++--- .../sync/CacheBasedBlacklistSpec.scala | 42 +++--- 5 files changed, 159 insertions(+), 62 deletions(-) diff --git a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala index f8059f7115..e563286b24 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala @@ -17,6 +17,7 @@ import monix.eval.Task import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.Try +import io.iohk.ethereum.blockchain.sync.CacheBasedBlacklist object FastSyncItSpecUtils { class FakePeer(peerName: String, fakePeerCustomConfig: FakePeerCustomConfig) @@ -24,6 +25,9 @@ object FastSyncItSpecUtils { lazy val validators = new MockValidatorsAlwaysSucceed + val maxSize = 1000 + val blacklist = CacheBasedBlacklist.empty(maxSize) + lazy val fastSync = system.actorOf( FastSync.props( storagesInstance.storages.fastSyncStateStorage, @@ -32,6 +36,7 @@ object FastSyncItSpecUtils { validators, peerEventBus, etcPeerManager, + blacklist, testSyncConfig, system.scheduler ) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index d171b3d257..fe2c2fbc55 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -10,38 +10,129 @@ import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ import scala.jdk.DurationConverters._ -import Blacklist.BlacklistId +import Blacklist._ +import io.iohk.ethereum.network.PeerId +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType.WrongBlockHeadersType +import io.iohk.ethereum.consensus.validators.std.StdBlockValidator.BlockError +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason.BlacklistReasonType trait Blacklist { def isBlacklisted(id: BlacklistId): Boolean - def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit + def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit def remove(id: BlacklistId): Unit def keys: Set[BlacklistId] } object Blacklist { + import BlacklistReason._ + import BlacklistReasonType._ + trait BlacklistId { def value: String } + + sealed trait BlacklistReason { + def reasonType: BlacklistReasonType + def description: String + } + object BlacklistReason { + trait BlacklistReasonType { + def code: Int + def name: String + } + object BlacklistReasonType { + case object WrongBlockHeadersType extends BlacklistReasonType { + val code: Int = 1 + val name: String = "WrongBlockHeadersType" + } + case object BlockHeaderValidationFailedType extends BlacklistReasonType { + val code: Int = 2 + val name: String = "BlockHeaderValidationFailed" + } + case object ErrorInBlockHeadersType extends BlacklistReasonType { + val code: Int = 3 + val name: String = "ErrorInBlockHeaders" + } + case object EmptyBlockBodiesType extends BlacklistReasonType { + val code: Int = 4 + val name: String = "EmptyBlockBodies" + } + case object BlockBodiesNotMatchingHeadersType extends BlacklistReasonType { + val code: Int = 5 + val name: String = "BlockBodiesNotMatchingHeaders" + } + case object EmptyReceiptsType extends BlacklistReasonType { + val code: Int = 6 + val name: String = "EmptyReceipts" + } + case object InvalidReceiptsType extends BlacklistReasonType { + val code: Int = 7 + val name: String = "InvalidReceipts" + } + case object RequestFailedType extends BlacklistReasonType { + val code: Int = 8 + val name: String = "RequestFailed" + } + case object PeerActorTerminatedType extends BlacklistReasonType { + val code: Int = 9 + val name: String = "PeerActorTerminated" + } + } + + case object WrongBlockHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = WrongBlockHeadersType + val description: String = "Wrong blockheaders response (empty or not chain forming)" + } + case object BlockHeaderValidationFailed extends BlacklistReason { + val reasonType: BlacklistReasonType = BlockHeaderValidationFailedType + val description: String = "Block header validation failed" + } + case object ErrorInBlockHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = ErrorInBlockHeadersType + val description: String = "Error in block headers response" + } + final case class EmptyBlockBodies(knownHashes: Seq[String]) extends BlacklistReason { + val reasonType: BlacklistReasonType = EmptyBlockBodiesType + val description: String = s"Got empty block bodies response for known hashes: $knownHashes" + } + case object BlockBodiesNotMatchingHeaders extends BlacklistReason { + val reasonType: BlacklistReasonType = BlockBodiesNotMatchingHeadersType + val description = "Block bodies not matching block headers" + } + final case class EmptyReceipts(knownHashes: Seq[String]) extends BlacklistReason { + val reasonType: BlacklistReasonType = EmptyReceiptsType + val description: String = s"Got empty receipts for known hashes: $knownHashes" + } + final case class InvalidReceipts(knownHashes: Seq[String], error: BlockError) extends BlacklistReason { + val reasonType: BlacklistReasonType = InvalidReceiptsType + val description: String = s"Got invalid receipts for known hashes: $knownHashes due to: $error" + } + final case class RequestFailed(error: String) extends BlacklistReason { + val reasonType: BlacklistReasonType = RequestFailedType + val description: String = s"Request failed with error: $error" + } + case object PeerActorTerminated extends BlacklistReason { + val reasonType: BlacklistReasonType = PeerActorTerminatedType + val description: String = "Peer actor terminated" + } + } } -final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) extends Blacklist with Logger { +final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonType]) extends Blacklist with Logger { + + import CacheBasedBlacklist._ override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined - override def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit = - cache - .policy() - .expireVariably() - .toScala - .fold { - log.warn( - s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration." - ) - cache.put(id, reason) - } { varExpirationPolicy => - varExpirationPolicy.put(id, reason, duration.toJava) - } + override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = { + log.debug("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason) + cache.policy().expireVariably().toScala match { + case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava) + case None => + log.warn(customExpirationError(id)) + cache.put(id, reason.reasonType) + } + } override def remove(id: BlacklistId): Unit = cache.invalidate(id) override def keys: Set[BlacklistId] = cache.underlying.asMap().keySet().asScala.toSet @@ -49,10 +140,13 @@ final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String]) extends object CacheBasedBlacklist { + def customExpirationError(id: BlacklistId): String = + s"Unexpected error while adding peer [${id.value}] to blacklist using custom expiration time. Falling back to default expiration." + def empty(maxSize: Int): CacheBasedBlacklist = { val cache = Scaffeine() - .expireAfter[BlacklistId, String]( + .expireAfter[BlacklistId, BlacklistReasonType]( create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, read = (_, _, _) => 60.minutes @@ -60,7 +154,7 @@ object CacheBasedBlacklist { .maximumSize( maxSize ) // uses Window TinyLfu eviction policy, see https://github.com/ben-manes/caffeine/wiki/Efficiency - .build[BlacklistId, String]() + .build[BlacklistId, BlacklistReasonType]() CacheBasedBlacklist(cache) } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala index 29ef214826..e5a032f2e5 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration._ trait PeerListSupportNg { self: Actor with ActorLogging => import PeerListSupportNg._ + import Blacklist._ private implicit val ec: ExecutionContext = context.dispatcher @@ -33,7 +34,7 @@ trait PeerListSupportNg { self: Actor with ActorLogging => def handlePeerListMessages: Receive = { case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers) - case PeerDisconnected(peerId) => removePeer(peerId) + case PeerDisconnected(peerId) => removePeerById(peerId) } def peersToDownloadFrom: Map[PeerId, PeerWithInfo] = @@ -41,9 +42,9 @@ trait PeerListSupportNg { self: Actor with ActorLogging => blacklist.isBlacklisted(peerId) } - def peerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer) + def getPeerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer) - def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: String): Unit = + def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: BlacklistReason): Unit = handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason)) private def updatePeers(peers: Map[Peer, PeerInfo]): Unit = { @@ -56,7 +57,7 @@ trait PeerListSupportNg { self: Actor with ActorLogging => handshakedPeers = updated } - private def removePeer(peerId: PeerId): Unit = { + private def removePeerById(peerId: PeerId): Unit = { if (handshakedPeers.keySet.contains(peerId)) { peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId))) blacklist.remove(peerId) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala index 77fc399aff..f9301c6be4 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala @@ -8,6 +8,8 @@ import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress import io.iohk.ethereum.blockchain.sync._ +import io.iohk.ethereum.blockchain.sync.Blacklist._ +import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason._ import io.iohk.ethereum.blockchain.sync.fast.ReceiptsValidator.ReceiptsValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncBlocksValidator.BlockBodyValidationResult import io.iohk.ethereum.blockchain.sync.fast.SyncStateSchedulerActor.{ @@ -183,7 +185,7 @@ class FastSync( ) handleBlockHeaders(peer, blockHeaders) else - blacklist.add(peer.id, blacklistDuration, "wrong blockheaders response (empty or not chain forming)") + blacklist.add(peer.id, blacklistDuration, WrongBlockHeaders) } case ResponseReceived(peer, BlockBodies(blockBodies), timeTaken) => @@ -205,10 +207,10 @@ class FastSync( handleReceipts(peer, requestedHashes, receipts) case PeerRequestHandler.RequestFailed(peer, reason) => - handleRequestFailure(peer, sender(), reason) + handleRequestFailure(peer, sender(), RequestFailed(reason)) case Terminated(ref) if assignedHandlers.contains(ref) => - handleRequestFailure(assignedHandlers(ref), ref, "Unexpected error") + handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated) } def askForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Unit = { @@ -423,7 +425,7 @@ class FastSync( } private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = { - blacklist.add(peer.id, duration, "block header validation failed") + blacklist.add(peer.id, duration, BlockHeaderValidationFailed) if (header.number <= syncState.safeDownloadTarget) { discardLastBlocks(header.number, N) syncState = syncState.updateDiscardedBlocks(header, N) @@ -461,27 +463,22 @@ class FastSync( ) } } else { - blacklist.add(peer.id, blacklistDuration, "error in block headers response") + blacklist.add(peer.id, blacklistDuration, ErrorInBlockHeaders) processSyncing() } } private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]): Unit = { if (blockBodies.isEmpty) { - val reason = - s"got empty block bodies response for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist.add(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(ByteStringUtils.hash2string) + blacklist.add(peer.id, blacklistDuration, EmptyBlockBodies(knownHashes)) syncState = syncState.enqueueBlockBodies(requestedHashes) } else { validateBlocks(requestedHashes, blockBodies) match { case BlockBodyValidationResult.Valid => insertBlocks(requestedHashes, blockBodies) case BlockBodyValidationResult.Invalid => - blacklist.add( - peer.id, - blacklistDuration, - s"responded with block bodies not matching block headers, blacklisting for $blacklistDuration" - ) + blacklist.add(peer.id, blacklistDuration, BlockBodiesNotMatchingHeaders) syncState = syncState.enqueueBlockBodies(requestedHashes) case BlockBodyValidationResult.DbError => redownloadBlockchain() @@ -493,8 +490,8 @@ class FastSync( private def handleReceipts(peer: Peer, requestedHashes: Seq[ByteString], receipts: Seq[Seq[Receipt]]): Unit = { if (receipts.isEmpty) { - val reason = s"got empty receipts for known hashes: ${requestedHashes.map(ByteStringUtils.hash2string)}" - blacklist.add(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(ByteStringUtils.hash2string) + blacklist.add(peer.id, blacklistDuration, EmptyReceipts(knownHashes)) syncState = syncState.enqueueReceipts(requestedHashes) } else { validateReceipts(requestedHashes, receipts) match { @@ -515,10 +512,8 @@ class FastSync( } case ReceiptsValidationResult.Invalid(error) => - val reason = - s"got invalid receipts for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}" + - s" due to: $error" - blacklist.add(peer.id, blacklistDuration, reason) + val knownHashes = requestedHashes.map(h => Hex.toHexString(h.toArray[Byte])) + blacklist.add(peer.id, blacklistDuration, InvalidReceipts(knownHashes, error)) syncState = syncState.enqueueReceipts(requestedHashes) case ReceiptsValidationResult.DbError => @@ -529,7 +524,7 @@ class FastSync( processSyncing() } - private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: String): Unit = { + private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = { removeRequestHandler(handler) syncState = syncState diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala index 47c3f2e7a5..774061a552 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -10,11 +10,10 @@ import org.scalatest.wordspec.AnyWordSpecLike import scala.concurrent.duration._ import com.google.common.testing.FakeTicker import com.github.blemale.scaffeine.Scaffeine -import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistId - import java.util.concurrent.TimeUnit class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { + import Blacklist._ private val peer1 = PeerId("1") private val peer2 = PeerId("2") @@ -22,6 +21,9 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { private val peer4 = PeerId("4") private val peer5 = PeerId("5") + private val reason = BlacklistReason.ErrorInBlockHeaders + private val anotherReason = BlacklistReason.BlockBodiesNotMatchingHeaders + private def withBlacklist(maxElements: Int)(test: CacheBasedBlacklist => Unit): Unit = { val blacklist = CacheBasedBlacklist.empty(maxElements) test(blacklist) @@ -29,11 +31,11 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { "CacheBasedBlacklist" should { "add elements and respect max number of elements" in withBlacklist(3) { blacklist => - blacklist.add(peer1, 1.minute, "just because") - blacklist.add(peer2, 1.minute, "just because") - blacklist.add(peer3, 1.minute, "just because") - blacklist.add(peer4, 1.minute, "just because") - blacklist.add(peer5, 1.minute, "just because") + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, reason) + blacklist.add(peer3, 1.minute, anotherReason) + blacklist.add(peer4, 1.minute, anotherReason) + blacklist.add(peer5, 1.minute, reason) blacklist.cache.cleanUp() assert(blacklist.keys.size <= 3) } @@ -41,7 +43,7 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { val maxSize = 10 val ticker = new FakeTicker() val cache = Scaffeine() - .expireAfter[BlacklistId, String]( + .expireAfter[BlacklistId, BlacklistReason.BlacklistReasonType]( create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, read = (_, _, _) => 60.minutes @@ -50,29 +52,29 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { maxSize ) .ticker(ticker.read _) - .build[BlacklistId, String]() + .build[BlacklistId, BlacklistReason.BlacklistReasonType]() val blacklist = CacheBasedBlacklist(cache) - blacklist.add(peer1, 1.minute, "just because") - blacklist.add(peer2, 10.minutes, "just because") - blacklist.add(peer3, 3.minute, "just because") - blacklist.add(peer4, 2.minute, "just because") - blacklist.add(peer5, 7.minutes, "just because") + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 10.minutes, reason) + blacklist.add(peer3, 3.minute, anotherReason) + blacklist.add(peer4, 2.minute, reason) + blacklist.add(peer5, 7.minutes, reason) ticker.advance(5, TimeUnit.MINUTES) val expected = Set(peer2, peer5) blacklist.cache.cleanUp() blacklist.keys must contain theSameElementsAs expected } "check if given key is part of the list" in withBlacklist(3) { blacklist => - blacklist.add(peer1, 1.minute, "just because") - blacklist.add(peer2, 1.minute, "just because") - blacklist.add(peer3, 1.minute, "just because") + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, anotherReason) + blacklist.add(peer3, 1.minute, reason) assert(blacklist.isBlacklisted(peer2) === true) assert(blacklist.isBlacklisted(PeerId("7")) === false) } "remove id from blacklist" in withBlacklist(3) { blacklist => - blacklist.add(peer1, 1.minute, "just because") - blacklist.add(peer2, 1.minute, "just because") - blacklist.add(peer3, 1.minute, "just because") + blacklist.add(peer1, 1.minute, reason) + blacklist.add(peer2, 1.minute, anotherReason) + blacklist.add(peer3, 1.minute, reason) assert(blacklist.isBlacklisted(peer2) === true) blacklist.remove(peer2) assert(blacklist.isBlacklisted(peer2) === false) From cefeea97fbfa0ad6c443e50b7aa765be2342997b Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Mon, 15 Feb 2021 07:46:24 +0100 Subject: [PATCH 11/15] Make BlacklistReasonType a sealed trait --- src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index fe2c2fbc55..b49d4c589e 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -36,7 +36,7 @@ object Blacklist { def description: String } object BlacklistReason { - trait BlacklistReasonType { + sealed trait BlacklistReasonType { def code: Int def name: String } From 32c2c053f69d72f2bae01bbc3225796cfb959796 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Mon, 15 Feb 2021 12:25:01 +0100 Subject: [PATCH 12/15] Only log description for blacklist reason --- src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index b49d4c589e..125186cdf1 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -125,7 +125,7 @@ final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonTy override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = { - log.debug("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason) + log.debug("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description) cache.policy().expireVariably().toScala match { case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava) case None => From c1a1e9924713f322a9af25c8fa7125eebde9b2a9 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Tue, 16 Feb 2021 09:06:59 +0100 Subject: [PATCH 13/15] ETCM-531 renamed minute -> minutes --- .../ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala index 774061a552..d8443895fa 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -56,8 +56,8 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { val blacklist = CacheBasedBlacklist(cache) blacklist.add(peer1, 1.minute, reason) blacklist.add(peer2, 10.minutes, reason) - blacklist.add(peer3, 3.minute, anotherReason) - blacklist.add(peer4, 2.minute, reason) + blacklist.add(peer3, 3.minutes, anotherReason) + blacklist.add(peer4, 2.minutes, reason) blacklist.add(peer5, 7.minutes, reason) ticker.advance(5, TimeUnit.MINUTES) val expected = Set(peer2, peer5) From 248907c94e3faa9ae3c228ea04c16e50c249127f Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Tue, 16 Feb 2021 09:58:39 +0100 Subject: [PATCH 14/15] ETCM-531 Reformat triggered by running sbt pp --- .../scala/io/iohk/ethereum/network/PeerManagerSpec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index e00dd4f057..2a90ef385c 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -310,8 +310,11 @@ class PeerManagerSpec time.advance(360000) // wait till the peer is out of the blacklist - val newRoundDiscoveredNodes = discoveredNodes + Node.fromUri(new java.net.URI( - "enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556")) + val newRoundDiscoveredNodes = discoveredNodes + Node.fromUri( + new java.net.URI( + "enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556" + ) + ) peerManager ! PeerDiscoveryManager.DiscoveredNodesInfo(newRoundDiscoveredNodes) From 25d6eeec11a12775821bc16cdb7fd3eadfaa1242 Mon Sep 17 00:00:00 2001 From: Petra Bierleutgeb Date: Tue, 16 Feb 2021 19:59:48 +0100 Subject: [PATCH 15/15] ETCM-531 Fix expiration after read --- .../scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala | 4 ++-- .../iohk/ethereum/blockchain/sync/PeerListSupportNg.scala | 2 +- .../ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala index 125186cdf1..8aa0f568a5 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala @@ -125,7 +125,7 @@ final case class CacheBasedBlacklist(cache: Cache[BlacklistId, BlacklistReasonTy override def isBlacklisted(id: BlacklistId): Boolean = cache.getIfPresent(id).isDefined override def add(id: BlacklistId, duration: FiniteDuration, reason: BlacklistReason): Unit = { - log.debug("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description) + log.info("Blacklisting peer [{}] for {}. Reason: {}", id, duration, reason.description) cache.policy().expireVariably().toScala match { case Some(varExpiration) => varExpiration.put(id, reason.reasonType, duration.toJava) case None => @@ -149,7 +149,7 @@ object CacheBasedBlacklist { .expireAfter[BlacklistId, BlacklistReasonType]( create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, - read = (_, _, _) => 60.minutes + read = (_, _, duration) => duration // read access should not change the expiration time ) // required to enable VarExpiration policy (i.e. set custom expiration time per element) .maximumSize( maxSize diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala index e5a032f2e5..0bca267ce1 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala @@ -30,7 +30,7 @@ trait PeerListSupportNg { self: Actor with ActorLogging => syncConfig.peersScanInterval, etcPeerManager, EtcPeerManagerActor.GetHandshakedPeers - ) + )(ec, context.self) def handlePeerListMessages: Receive = { case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers) diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala index d8443895fa..afb8ec5cee 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/CacheBasedBlacklistSpec.scala @@ -37,7 +37,8 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { blacklist.add(peer4, 1.minute, anotherReason) blacklist.add(peer5, 1.minute, reason) blacklist.cache.cleanUp() - assert(blacklist.keys.size <= 3) + val size = blacklist.keys.size + assert(size <= 3 && size > 0) } "should expire elements" in { val maxSize = 10 @@ -46,7 +47,7 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { .expireAfter[BlacklistId, BlacklistReason.BlacklistReasonType]( create = (_, _) => 60.minutes, update = (_, _, _) => 60.minutes, - read = (_, _, _) => 60.minutes + read = (_, _, duration) => duration ) .maximumSize( maxSize @@ -59,6 +60,8 @@ class CacheBasedBlacklistSpec extends AnyWordSpecLike with Matchers { blacklist.add(peer3, 3.minutes, anotherReason) blacklist.add(peer4, 2.minutes, reason) blacklist.add(peer5, 7.minutes, reason) + blacklist.isBlacklisted(peer2) // just to simulate a read + blacklist.keys // just to simulate a read ticker.advance(5, TimeUnit.MINUTES) val expected = Set(peer2, peer5) blacklist.cache.cleanUp()