Skip to content

Commit f74b073

Browse files
author
Petra Bierleutgeb
committed
Rework PeerListSupport a little bit
1 parent d1a38fe commit f74b073

File tree

4 files changed

+53
-45
lines changed

4 files changed

+53
-45
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/Blacklist.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@ import scala.jdk.DurationConverters._
1313
import Blacklist.BlacklistId
1414

1515
trait Blacklist {
16-
1716
def isBlacklisted(id: BlacklistId): Boolean
1817
def add(id: BlacklistId, duration: FiniteDuration, reason: String): Unit
1918
def remove(id: BlacklistId): Unit
2019
def keys: Set[BlacklistId]
2120
}
2221

2322
object Blacklist {
24-
2523
trait BlacklistId {
2624
def value: String
2725
}
28-
2926
}
3027

3128
final case class CacheBasedBlacklist(cache: Cache[BlacklistId, String])

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupport.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig
1111
import scala.concurrent.duration._
1212
import scala.concurrent.ExecutionContext.Implicits.global
1313

14+
// will be removed once regular sync is switched to new blacklist/peerlist implementation
1415
trait PeerListSupport {
1516
self: Actor with ActorLogging with BlacklistSupport =>
1617
import PeerListSupport._

src/main/scala/io/iohk/ethereum/blockchain/sync/PeerListSupportNg.scala

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,56 +8,64 @@ import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsu
88
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer, PeerId}
99
import io.iohk.ethereum.utils.Config.SyncConfig
1010

11+
import scala.concurrent.ExecutionContext
1112
import scala.concurrent.duration._
12-
import scala.concurrent.ExecutionContext.Implicits.global // FIXME copied from current impl but not sure global EC is a good choice
1313

14-
trait PeerListSupportNg {
15-
self: Actor with ActorLogging =>
14+
trait PeerListSupportNg { self: Actor with ActorLogging =>
1615
import PeerListSupportNg._
1716

17+
private implicit val ec: ExecutionContext = context.dispatcher
18+
1819
def etcPeerManager: ActorRef
1920
def peerEventBus: ActorRef
2021
def blacklist: Blacklist
2122
def syncConfig: SyncConfig
2223
def scheduler: Scheduler
2324

24-
var handshakedPeers: PeersMap = Map.empty
25+
protected var handshakedPeers: Map[PeerId, PeerWithInfo] = Map.empty
2526

2627
scheduler.scheduleWithFixedDelay(
2728
0.seconds,
2829
syncConfig.peersScanInterval,
2930
etcPeerManager,
3031
EtcPeerManagerActor.GetHandshakedPeers
31-
)(global, context.self)
32+
)
3233

33-
def removePeer(peerId: PeerId): Unit = {
34-
peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))
35-
handshakedPeers.find(_._1.id == peerId).foreach { case (peer, _) => blacklist.remove(peer.id) }
36-
handshakedPeers = handshakedPeers.filterNot(_._1.id == peerId)
34+
def handlePeerListMessages: Receive = {
35+
case EtcPeerManagerActor.HandshakedPeers(peers) => updatePeers(peers)
36+
case PeerDisconnected(peerId) => removePeer(peerId)
3737
}
3838

39-
def peersToDownloadFrom: PeersMap =
40-
handshakedPeers.filterNot { case (p, _) => blacklist.isBlacklisted(p.id) }
39+
def peersToDownloadFrom: Map[PeerId, PeerWithInfo] =
40+
handshakedPeers.filterNot { case (peerId, _) =>
41+
blacklist.isBlacklisted(peerId)
42+
}
4143

42-
def handlePeerListMessages: Receive = {
43-
case EtcPeerManagerActor.HandshakedPeers(peers) =>
44-
peers.keys.filterNot(handshakedPeers.contains).foreach { peer =>
45-
peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peer.id)))
46-
}
47-
handshakedPeers = peers
48-
49-
case PeerDisconnected(peerId) if handshakedPeers.exists(_._1.id == peerId) =>
50-
removePeer(peerId)
44+
def peerById(peerId: PeerId): Option[Peer] = handshakedPeers.get(peerId).map(_.peer)
45+
46+
def blacklistIfHandshaked(peerId: PeerId, duration: FiniteDuration, reason: String): Unit =
47+
handshakedPeers.get(peerId).foreach(_ => blacklist.add(peerId, duration, reason))
48+
49+
private def updatePeers(peers: Map[Peer, PeerInfo]): Unit = {
50+
val updated = peers.map { case (peer, peerInfo) =>
51+
(peer.id, PeerWithInfo(peer, peerInfo))
52+
}
53+
updated.filterNot(p => handshakedPeers.keySet.contains(p._1)).foreach { case (peerId, _) =>
54+
peerEventBus ! Subscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))
55+
}
56+
handshakedPeers = updated
5157
}
5258

53-
def peerById(peerId: PeerId): Option[Peer] = handshakedPeers collectFirst {
54-
case (peer, _) if peer.id == peerId => peer
59+
private def removePeer(peerId: PeerId): Unit = {
60+
if (handshakedPeers.keySet.contains(peerId)) {
61+
peerEventBus ! Unsubscribe(PeerDisconnectedClassifier(PeerSelector.WithId(peerId)))
62+
blacklist.remove(peerId)
63+
handshakedPeers = handshakedPeers - peerId
64+
}
5565
}
5666

57-
def blacklistIfHandshaked(peer: Peer, duration: FiniteDuration, reason: String): Unit =
58-
handshakedPeers.get(peer).foreach(_ => blacklist.add(peer.id, duration, reason))
5967
}
6068

6169
object PeerListSupportNg {
62-
type PeersMap = Map[Peer, PeerInfo]
70+
final case class PeerWithInfo(peer: Peer, peerInfo: PeerInfo)
6371
}

src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.time.Instant
44
import akka.actor._
55
import akka.util.ByteString
66
import cats.data.NonEmptyList
7+
import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo
78
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived
89
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
910
import io.iohk.ethereum.blockchain.sync._
@@ -20,7 +21,7 @@ import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
2021
import io.iohk.ethereum.domain._
2122
import io.iohk.ethereum.mpt.MerklePatriciaTrie
2223
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
23-
import io.iohk.ethereum.network.Peer
24+
import io.iohk.ethereum.network.{Peer, PeerId}
2425
import io.iohk.ethereum.network.p2p.messages.Codes
2526
import io.iohk.ethereum.network.p2p.messages.PV62._
2627
import io.iohk.ethereum.network.p2p.messages.PV63._
@@ -541,7 +542,7 @@ class FastSync(
541542
requestedReceipts = requestedReceipts - handler
542543

543544
requestedHeaders -= peer
544-
if (handshakedPeers.contains(peer)) {
545+
if (handshakedPeers.contains(peer.id)) {
545546
blacklist.add(peer.id, blacklistDuration, reason)
546547
}
547548
}
@@ -567,7 +568,8 @@ class FastSync(
567568
}
568569

569570
private def printStatus(): Unit = {
570-
val formatPeer: Peer => String = peer =>
571+
def formatPeerEntry(entry: PeerWithInfo): String = formatPeer(entry.peer)
572+
def formatPeer(peer: Peer): String =
571573
s"${peer.remoteAddress.getAddress.getHostAddress}:${peer.remoteAddress.getPort}"
572574
val blacklistedIds = blacklist.keys
573575
log.info(
@@ -583,7 +585,7 @@ class FastSync(
583585
| blacklisted({})
584586
|""".stripMargin.replace("\n", " "),
585587
assignedHandlers.values.map(formatPeer).toSeq.sorted.mkString(", "),
586-
handshakedPeers.keys.map(formatPeer).toSeq.sorted.mkString(", "),
588+
handshakedPeers.values.toList.map(e => formatPeerEntry(e)).sorted.mkString(", "),
587589
blacklistedIds.map(_.value).mkString(", ")
588590
)
589591
}
@@ -609,12 +611,12 @@ class FastSync(
609611
}
610612

611613
private def getPeersWithFreshEnoughPivot(
612-
peers: NonEmptyList[(Peer, PeerInfo)],
614+
peers: NonEmptyList[PeerWithInfo],
613615
state: SyncState,
614616
syncConfig: SyncConfig
615617
): List[(Peer, BigInt)] = {
616618
peers.collect {
617-
case (peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) =>
619+
case PeerWithInfo(peer, info) if hasBestBlockFreshEnoughToUpdatePivotBlock(info, state, syncConfig) =>
618620
(peer, info.maxBlockNumber)
619621
}
620622
}
@@ -626,17 +628,17 @@ class FastSync(
626628
!(syncState.updatingPivotBlock || stateSyncRestartRequested)
627629

628630
def pivotBlockIsStale(): Boolean = {
629-
val currentPeers = peersToDownloadFrom.toList
630-
if (currentPeers.isEmpty) {
631+
val peersWithInfo = peersToDownloadFrom.values.toList
632+
if (peersWithInfo.isEmpty) {
631633
false
632634
} else {
633-
val peerWithBestBlockInNetwork = currentPeers.maxBy(peerWithNum => peerWithNum._2.maxBlockNumber)
635+
val peerWithBestBlockInNetwork = peersWithInfo.maxBy(_.peerInfo.maxBlockNumber)
634636

635637
val bestPossibleTargetDifferenceInNetwork =
636-
(peerWithBestBlockInNetwork._2.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number
638+
(peerWithBestBlockInNetwork.peerInfo.maxBlockNumber - syncConfig.pivotBlockOffset) - syncState.pivotBlock.number
637639

638640
val peersWithTooFreshPossiblePivotBlock =
639-
getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(currentPeers), syncState, syncConfig)
641+
getPeersWithFreshEnoughPivot(NonEmptyList.fromListUnsafe(peersWithInfo), syncState, syncConfig)
640642

641643
if (peersWithTooFreshPossiblePivotBlock.isEmpty) {
642644
log.info(
@@ -652,7 +654,7 @@ class FastSync(
652654
"There are {} peers with possible new pivot block, " +
653655
"best known pivot in current peer list has number {}",
654656
peersWithTooFreshPossiblePivotBlock.size,
655-
peerWithBestBlockInNetwork._2.maxBlockNumber
657+
peerWithBestBlockInNetwork.peerInfo.maxBlockNumber
656658
)
657659

658660
pivotBlockIsStale
@@ -715,7 +717,7 @@ class FastSync(
715717
.filter(p => peerRequestsTime.get(p.peer).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now)))
716718
peers
717719
.take(maxConcurrentRequests - assignedHandlers.size)
718-
.sortBy(_.info.maxBlockNumber)(Ordering[BigInt].reverse)
720+
.sortBy(_.peerInfo.maxBlockNumber)(Ordering[BigInt].reverse)
719721
.foreach(assignBlockchainWork)
720722
}
721723
}
@@ -803,8 +805,10 @@ class FastSync(
803805
peerRequestsTime += (peer -> Instant.now())
804806
}
805807

806-
def unassignedPeers: List[PeerWithInfo] =
807-
(peersToDownloadFrom -- assignedHandlers.values).map(PeerWithInfo.tupled).toList
808+
def unassignedPeers: List[PeerWithInfo] = {
809+
val assignedPeers = assignedHandlers.values.map(_.id).toList
810+
peersToDownloadFrom.removedAll(assignedPeers).values.toList
811+
}
808812

809813
def blockchainDataToDownload: Boolean =
810814
syncState.blockChainWorkQueued || syncState.bestBlockHeaderNumber < syncState.safeDownloadTarget
@@ -838,8 +842,6 @@ class FastSync(
838842

839843
object FastSync {
840844

841-
case class PeerWithInfo(peer: Peer, info: PeerInfo)
842-
843845
// scalastyle:off parameter.number
844846
def props(
845847
fastSyncStateStorage: FastSyncStateStorage,

0 commit comments

Comments
 (0)