Skip to content

Commit dbb28a8

Browse files
committed
[ETCM-275] Improve synccontroller tests
1 parent 668d219 commit dbb28a8

File tree

2 files changed

+164
-110
lines changed

2 files changed

+164
-110
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncStateSchedulerActor.scala

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -90,36 +90,37 @@ class SyncStateSchedulerActor(
9090
self ! BloomFilterResult(value)
9191
}
9292

93-
def waitingForBloomFilterToLoad(lastReceivedCommand: Option[(SyncStateSchedulerActorCommand, ActorRef)]): Receive = {
94-
case BloomFilterResult(result) =>
95-
log.debug(
96-
s"Loaded ${result.writtenElements} already known elements from storage to bloom filter the error while loading " +
97-
s"was ${result.error}"
98-
)
99-
lastReceivedCommand match {
100-
case Some((startSignal: StartSyncingTo, sender)) =>
101-
val initStats = ProcessingStatistics().addSaved(result.writtenElements)
102-
val initState = startSyncing(startSignal.stateRoot, startSignal.blockNumber)
103-
context become (syncing(
104-
initState,
105-
DownloaderState(),
106-
initStats,
107-
startSignal.blockNumber,
108-
sender,
109-
Queue(),
110-
processing = false,
111-
None
112-
))
113-
case Some((restartSignal: RestartRequested.type, sender)) =>
114-
sender ! WaitingForNewTargetBlock
115-
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
116-
case _ =>
117-
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
118-
}
93+
def waitingForBloomFilterToLoad(lastReceivedCommand: Option[(SyncStateSchedulerActorCommand, ActorRef)]): Receive =
94+
handleCommonMessages orElse {
95+
case BloomFilterResult(result) =>
96+
log.debug(
97+
s"Loaded ${result.writtenElements} already known elements from storage to bloom filter the error while loading " +
98+
s"was ${result.error}"
99+
)
100+
lastReceivedCommand match {
101+
case Some((startSignal: StartSyncingTo, sender)) =>
102+
val initStats = ProcessingStatistics().addSaved(result.writtenElements)
103+
val initState = startSyncing(startSignal.stateRoot, startSignal.blockNumber)
104+
context become (syncing(
105+
initState,
106+
DownloaderState(),
107+
initStats,
108+
startSignal.blockNumber,
109+
sender,
110+
Queue(),
111+
processing = false,
112+
None
113+
))
114+
case Some((restartSignal: RestartRequested.type, sender)) =>
115+
sender ! WaitingForNewTargetBlock
116+
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
117+
case _ =>
118+
context.become(idle(ProcessingStatistics().addSaved(result.writtenElements)))
119+
}
119120

120-
case command: SyncStateSchedulerActorCommand =>
121-
context.become(waitingForBloomFilterToLoad(Some((command, sender()))))
122-
}
121+
case command: SyncStateSchedulerActorCommand =>
122+
context.become(waitingForBloomFilterToLoad(Some((command, sender()))))
123+
}
123124

124125
private def startSyncing(root: ByteString, bn: BigInt): SchedulerState = {
125126
timers.startTimerAtFixedRate(PrintInfoKey, PrintInfo, 30.seconds)
@@ -130,7 +131,7 @@ class SyncStateSchedulerActor(
130131
initState
131132
}
132133

133-
def idle(processingStatistics: ProcessingStatistics): Receive = {
134+
def idle(processingStatistics: ProcessingStatistics): Receive = handleCommonMessages orElse {
134135
case StartSyncingTo(root, bn) =>
135136
val state1 = startSyncing(root, bn)
136137
context become (syncing(

src/test/scala/io/iohk/ethereum/blockchain/sync/SyncControllerSpec.scala

Lines changed: 133 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
4545
}
4646

4747
after {
48-
Await.result(system.terminate(), 1.seconds)
48+
Await.result(system.terminate(), 10.seconds)
4949
}
5050

5151
"SyncController" should "download pivot block and request block headers" in new TestSetup() {
@@ -249,7 +249,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
249249
storagesInstance.storages.fastSyncStateStorage.getSyncState().get.pivotBlock shouldBe defaultPivotBlockHeader
250250
}
251251

252-
setupAutoPilot(etcPeerManager, newHanshaked, newPivot, BlockchainData(newBlocks))
252+
updateAutoPilot(etcPeerManager.ref, newHanshaked, newPivot, BlockchainData(newBlocks))
253253
val watcher = TestProbe()
254254
watcher.watch(syncController)
255255

@@ -287,7 +287,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
287287
storagesInstance.storages.fastSyncStateStorage.getSyncState().get.pivotBlockUpdateFailures shouldBe 1
288288
}
289289

290-
setupAutoPilot(etcPeerManager, freshHandshakedPeers, freshHeader, BlockchainData(newBlocks), onlyPivot = true)
290+
updateAutoPilot(etcPeerManager.ref, freshHandshakedPeers, freshHeader, BlockchainData(newBlocks), onlyPivot = true)
291291

292292
eventually(timeout = eventuallyTimeOut) {
293293
storagesInstance.storages.fastSyncStateStorage.getSyncState().get.pivotBlock shouldBe defaultPivotBlockHeader
@@ -320,7 +320,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
320320
val freshHandshakedPeers1 = HandshakedPeers(Map(peer1 -> freshPeerInfo1a))
321321

322322
// set up new received header previously received header will need update
323-
setupAutoPilot(etcPeerManager, freshHandshakedPeers1, freshHeader1, BlockchainData(newBlocks))
323+
updateAutoPilot(etcPeerManager.ref, freshHandshakedPeers1, freshHeader1, BlockchainData(newBlocks))
324324

325325
eventually(timeout = longeventuallyTimeOut) {
326326
storagesInstance.storages.fastSyncStateStorage
@@ -397,7 +397,13 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
397397
val newHandshakedPeers = HandshakedPeers(Map(peer1 -> peerWithBetterBlock))
398398
val newPivot = defaultPivotBlockHeader.copy(number = defaultPivotBlockHeader.number + syncConfig.maxPivotBlockAge)
399399

400-
setupAutoPilot(etcPeerManager, newHandshakedPeers, newPivot, BlockchainData(newBlocks), failedNodeRequest = true)
400+
updateAutoPilot(
401+
etcPeerManager.ref,
402+
newHandshakedPeers,
403+
newPivot,
404+
BlockchainData(newBlocks),
405+
failedNodeRequest = true
406+
)
401407

402408
// sync to new pivot
403409
eventually(timeout = eventuallyTimeOut) {
@@ -406,7 +412,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
406412
}
407413

408414
// enable peer to respond with mpt nodes
409-
setupAutoPilot(etcPeerManager, newHandshakedPeers, newPivot, BlockchainData(newBlocks))
415+
updateAutoPilot(etcPeerManager.ref, newHandshakedPeers, newPivot, BlockchainData(newBlocks))
410416

411417
val watcher = TestProbe()
412418
watcher.watch(syncController)
@@ -426,6 +432,8 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
426432
) extends EphemBlockchainTestSetup
427433
with TestSyncPeers
428434
with TestSyncConfig {
435+
436+
@volatile
429437
var stateDownloadStarted = false
430438

431439
val eventuallyTimeOut: Timeout = Timeout(Span(10, Seconds))
@@ -482,7 +490,7 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
482490
ommersPool.ref,
483491
etcPeerManager.ref,
484492
syncConfig,
485-
externalSchedulerOpt = None
493+
externalSchedulerOpt = Some(system.scheduler)
486494
)
487495
)
488496
)
@@ -511,6 +519,78 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
511519
}
512520
}
513521
}
522+
// scalastyle:off method.length
523+
case class SyncStateAutoPilot(
524+
handshakedPeers: HandshakedPeers,
525+
pivotHeader: BlockHeader,
526+
blockchainData: BlockchainData,
527+
failedReceiptsTries: Int,
528+
failedBodiesTries: Int,
529+
onlyPivot: Boolean,
530+
failedNodeRequest: Boolean
531+
) extends AutoPilot {
532+
override def run(sender: ActorRef, msg: Any): AutoPilot = {
533+
msg match {
534+
case EtcPeerManagerActor.GetHandshakedPeers =>
535+
sender ! handshakedPeers
536+
this
537+
538+
case SendMessage(msg: GetBlockHeadersEnc, peer) =>
539+
val underlyingMessage = msg.underlyingMsg
540+
if (underlyingMessage.maxHeaders == 1) {
541+
// pivot block
542+
sender ! MessageFromPeer(BlockHeaders(Seq(pivotHeader)), peer)
543+
this
544+
} else {
545+
if (!onlyPivot) {
546+
val start = msg.underlyingMsg.block.left.get
547+
val stop = start + msg.underlyingMsg.maxHeaders
548+
val headers = (start until stop).flatMap(i => blockchainData.headers.get(i))
549+
sender ! MessageFromPeer(BlockHeaders(headers), peer)
550+
this
551+
} else {
552+
this
553+
}
554+
}
555+
556+
case SendMessage(msg: GetReceiptsEnc, peer) if !onlyPivot =>
557+
val underlyingMessage = msg.underlyingMsg
558+
if (failedReceiptsTries > 0) {
559+
sender ! MessageFromPeer(Receipts(Seq()), peer)
560+
this.copy(failedReceiptsTries = failedReceiptsTries - 1)
561+
} else {
562+
val rec = msg.underlyingMsg.blockHashes.flatMap(h => blockchainData.receipts.get(h))
563+
sender ! MessageFromPeer(Receipts(rec), peer)
564+
this
565+
}
566+
567+
case SendMessage(msg: GetBlockBodiesEnc, peer) if !onlyPivot =>
568+
val underlyingMessage = msg.underlyingMsg
569+
if (failedBodiesTries > 0) {
570+
sender ! MessageFromPeer(BlockBodies(Seq()), peer)
571+
this.copy(failedBodiesTries = failedBodiesTries - 1)
572+
} else {
573+
val bod = msg.underlyingMsg.hashes.flatMap(h => blockchainData.bodies.get(h))
574+
sender ! MessageFromPeer(BlockBodies(bod), peer)
575+
this
576+
}
577+
578+
case SendMessage(msg: GetNodeDataEnc, peer) if !onlyPivot =>
579+
stateDownloadStarted = true
580+
val underlyingMessage = msg.underlyingMsg
581+
if (!failedNodeRequest) {
582+
sender ! MessageFromPeer(NodeData(Seq(defaultStateMptLeafWithAccount)), peer)
583+
this
584+
} else {
585+
this
586+
}
587+
588+
case AutoPilotUpdateData(peers, pivot, data, failedReceipts, failedBodies, onlyPivot, failedNode) =>
589+
sender ! DataUpdated
590+
this.copy(peers, pivot, data, failedReceipts, failedBodies, onlyPivot, failedNode)
591+
}
592+
}
593+
}
514594

515595
// scalastyle:off method.length parameter.number
516596
def setupAutoPilot(
@@ -523,59 +603,54 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
523603
onlyPivot: Boolean = false,
524604
failedNodeRequest: Boolean = false
525605
): Unit = {
526-
testProbe.setAutoPilot(new AutoPilot {
527-
var failedReceipts = 0
528-
var failedBodies = 0
529-
def run(sender: ActorRef, msg: Any): AutoPilot = {
530-
msg match {
531-
case EtcPeerManagerActor.GetHandshakedPeers =>
532-
sender ! handshakedPeers
533-
534-
case SendMessage(msg: GetBlockHeadersEnc, peer) =>
535-
val underlyingMessage = msg.underlyingMsg
536-
if (underlyingMessage.maxHeaders == 1) {
537-
// pivot block
538-
sender ! MessageFromPeer(BlockHeaders(Seq(pivotHeader)), peer)
539-
} else {
540-
if (!onlyPivot) {
541-
val start = msg.underlyingMsg.block.left.get
542-
val stop = start + msg.underlyingMsg.maxHeaders
543-
val headers = (start until stop).flatMap(i => blockchainData.headers.get(i))
544-
sender ! MessageFromPeer(BlockHeaders(headers), peer)
545-
}
546-
}
547-
548-
case SendMessage(msg: GetReceiptsEnc, peer) if !onlyPivot =>
549-
val underlyingMessage = msg.underlyingMsg
550-
if (failedReceipts < failedReceiptsTries) {
551-
sender ! MessageFromPeer(Receipts(Seq()), peer)
552-
failedReceipts = failedReceipts + 1
553-
} else {
554-
val rec = msg.underlyingMsg.blockHashes.flatMap(h => blockchainData.receipts.get(h))
555-
sender ! MessageFromPeer(Receipts(rec), peer)
556-
}
557-
558-
case SendMessage(msg: GetBlockBodiesEnc, peer) if !onlyPivot =>
559-
val underlyingMessage = msg.underlyingMsg
560-
if (failedBodies < failedBodiesTries) {
561-
sender ! MessageFromPeer(BlockBodies(Seq()), peer)
562-
failedBodies = failedBodies + 1
563-
} else {
564-
val bod = msg.underlyingMsg.hashes.flatMap(h => blockchainData.bodies.get(h))
565-
sender ! MessageFromPeer(BlockBodies(bod), peer)
566-
}
606+
testProbe.setAutoPilot(
607+
SyncStateAutoPilot(
608+
handshakedPeers,
609+
pivotHeader,
610+
blockchainData,
611+
failedReceiptsTries,
612+
failedBodiesTries,
613+
onlyPivot,
614+
failedNodeRequest
615+
)
616+
)
617+
}
567618

568-
case SendMessage(msg: GetNodeDataEnc, peer) if !onlyPivot =>
569-
stateDownloadStarted = true
570-
val underlyingMessage = msg.underlyingMsg
571-
if (!failedNodeRequest) {
572-
sender ! MessageFromPeer(NodeData(Seq(defaultStateMptLeafWithAccount)), peer)
573-
}
574-
}
619+
case class AutoPilotUpdateData(
620+
handshakedPeers: HandshakedPeers,
621+
pivotHeader: BlockHeader,
622+
blockchainData: BlockchainData,
623+
failedReceiptsTries: Int = 0,
624+
failedBodiesTries: Int = 0,
625+
onlyPivot: Boolean = false,
626+
failedNodeRequest: Boolean = false
627+
)
628+
case object DataUpdated
575629

576-
this
577-
}
578-
})
630+
def updateAutoPilot(
631+
probeWIthPilot: ActorRef,
632+
handshakedPeers: HandshakedPeers,
633+
pivotHeader: BlockHeader,
634+
blockchainData: BlockchainData,
635+
failedReceiptsTries: Int = 0,
636+
failedBodiesTries: Int = 0,
637+
onlyPivot: Boolean = false,
638+
failedNodeRequest: Boolean = false
639+
): Unit = {
640+
val sender = TestProbe()
641+
probeWIthPilot.tell(
642+
AutoPilotUpdateData(
643+
handshakedPeers,
644+
pivotHeader,
645+
blockchainData,
646+
failedReceiptsTries,
647+
failedBodiesTries,
648+
onlyPivot,
649+
failedNodeRequest
650+
),
651+
sender.ref
652+
)
653+
sender.expectMsg(DataUpdated)
579654
}
580655

581656
val defaultExpectedPivotBlock = 399500
@@ -645,26 +720,4 @@ class SyncControllerSpec extends AnyFlatSpec with Matchers with BeforeAndAfter w
645720
Thread.sleep(300)
646721
}
647722
}
648-
649-
class TestWithRegularSyncOnSetup extends TestSetup() {
650-
val syncControllerWithRegularSync = TestActorRef(
651-
Props(
652-
new SyncController(
653-
storagesInstance.storages.appStateStorage,
654-
blockchain,
655-
blockchainConfig,
656-
storagesInstance.storages.fastSyncStateStorage,
657-
ledger,
658-
new Mocks.MockValidatorsAlwaysSucceed,
659-
peerMessageBus.ref,
660-
pendingTransactionsManager.ref,
661-
checkpointBlockGenerator,
662-
ommersPool.ref,
663-
etcPeerManager.ref,
664-
syncConfig,
665-
externalSchedulerOpt = None
666-
)
667-
)
668-
)
669-
}
670723
}

0 commit comments

Comments
 (0)