Skip to content

Commit fbd1980

Browse files
committed
[ETCM-77] checkpoint sync
1 parent 8ae3eef commit fbd1980

23 files changed

+1035
-467
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/SyncController.scala

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.iohk.ethereum.blockchain.sync
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props, Scheduler}
44
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
5+
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
56
import io.iohk.ethereum.consensus.validators.Validators
67
import io.iohk.ethereum.db.storage.{AppStateStorage, FastSyncStateStorage}
78
import io.iohk.ethereum.domain.Blockchain
@@ -16,11 +17,12 @@ class SyncController(
1617
validators: Validators,
1718
peerEventBus: ActorRef,
1819
pendingTransactionsManager: ActorRef,
20+
checkpointBlockGenerator: CheckpointBlockGenerator,
1921
ommersPool: ActorRef,
2022
etcPeerManager: ActorRef,
2123
syncConfig: SyncConfig,
22-
externalSchedulerOpt: Option[Scheduler] = None)
23-
extends Actor
24+
externalSchedulerOpt: Option[Scheduler] = None
25+
) extends Actor
2426
with ActorLogging {
2527

2628
import SyncController._
@@ -29,8 +31,8 @@ class SyncController(
2931

3032
override def receive: Receive = idle
3133

32-
def idle: Receive = {
33-
case Start => start()
34+
def idle: Receive = { case Start =>
35+
start()
3436
}
3537

3638
def runningFastSync(fastSync: ActorRef): Receive = {
@@ -41,8 +43,8 @@ class SyncController(
4143
case other => fastSync.forward(other)
4244
}
4345

44-
def runningRegularSync(regularSync: ActorRef): Receive = {
45-
case other => regularSync.forward(other)
46+
def runningRegularSync(regularSync: ActorRef): Receive = { case other =>
47+
regularSync.forward(other)
4648
}
4749

4850
def start(): Unit = {
@@ -54,15 +56,17 @@ class SyncController(
5456
startFastSync()
5557
case (true, true) =>
5658
log.warning(
57-
s"do-fast-sync is set to $doFastSync but fast sync cannot start because it has already been completed")
59+
s"do-fast-sync is set to $doFastSync but fast sync cannot start because it has already been completed"
60+
)
5861
startRegularSync()
5962
case (true, false) =>
6063
startRegularSync()
6164
case (false, false) =>
6265
//Check whether fast sync was started before
6366
if (fastSyncStateStorage.getSyncState().isDefined) {
6467
log.warning(
65-
s"do-fast-sync is set to $doFastSync but regular sync cannot start because fast sync hasn't completed")
68+
s"do-fast-sync is set to $doFastSync but regular sync cannot start because fast sync hasn't completed"
69+
)
6670
startFastSync()
6771
} else
6872
startRegularSync()
@@ -79,14 +83,17 @@ class SyncController(
7983
peerEventBus,
8084
etcPeerManager,
8185
syncConfig,
82-
scheduler),
83-
"fast-sync")
86+
scheduler
87+
),
88+
"fast-sync"
89+
)
8490
fastSync ! FastSync.Start
8591
context become runningFastSync(fastSync)
8692
}
8793

8894
def startRegularSync(): Unit = {
89-
val peersClient = context.actorOf(PeersClient.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "peers-client")
95+
val peersClient =
96+
context.actorOf(PeersClient.props(etcPeerManager, peerEventBus, syncConfig, scheduler), "peers-client")
9097
val regularSync = context.actorOf(
9198
RegularSync.props(
9299
peersClient,
@@ -97,7 +104,9 @@ class SyncController(
97104
syncConfig,
98105
ommersPool,
99106
pendingTransactionsManager,
100-
scheduler),
107+
checkpointBlockGenerator,
108+
scheduler
109+
),
101110
"regular-sync"
102111
)
103112
regularSync ! RegularSync.Start
@@ -116,9 +125,11 @@ object SyncController {
116125
validators: Validators,
117126
peerEventBus: ActorRef,
118127
pendingTransactionsManager: ActorRef,
128+
checkpointBlockGenerator: CheckpointBlockGenerator,
119129
ommersPool: ActorRef,
120130
etcPeerManager: ActorRef,
121-
syncConfig: SyncConfig): Props =
131+
syncConfig: SyncConfig
132+
): Props =
122133
Props(
123134
new SyncController(
124135
appStateStorage,
@@ -128,9 +139,12 @@ object SyncController {
128139
validators,
129140
peerEventBus,
130141
pendingTransactionsManager,
142+
checkpointBlockGenerator,
131143
ommersPool,
132144
etcPeerManager,
133-
syncConfig))
145+
syncConfig
146+
)
147+
)
134148

135149
case object Start
136150
}

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockImporter.scala

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,40 @@ package io.iohk.ethereum.blockchain.sync.regular
22

33
import akka.actor.Actor.Receive
44
import akka.actor.{Actor, ActorLogging, ActorRef, NotInfluenceReceiveTimeout, Props, ReceiveTimeout}
5+
import akka.util.ByteString
56
import cats.data.NonEmptyList
67
import cats.instances.future._
78
import cats.instances.list._
89
import cats.syntax.apply._
910
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlocks
10-
import io.iohk.ethereum.crypto.kec256
11-
import io.iohk.ethereum.domain.{Block, Blockchain, SignedTransaction}
11+
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
12+
import io.iohk.ethereum.crypto.{ECDSASignature, kec256}
13+
import io.iohk.ethereum.domain.{Block, Blockchain, Checkpoint, SignedTransaction}
1214
import io.iohk.ethereum.ledger._
1315
import io.iohk.ethereum.mpt.MerklePatriciaTrie.MissingNodeException
1416
import io.iohk.ethereum.network.PeerId
1517
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
1618
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
1719
import io.iohk.ethereum.transactions.PendingTransactionsManager
1820
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddUncheckedTransactions, RemoveTransactions}
21+
import io.iohk.ethereum.utils.ByteStringUtils
1922
import io.iohk.ethereum.utils.Config.SyncConfig
2023
import io.iohk.ethereum.utils.FunctorOps._
2124

2225
import scala.concurrent.{ExecutionContext, Future}
26+
import scala.concurrent.duration._
2327
import scala.util.{Failure, Success}
2428

29+
// scalastyle:off cyclomatic.complexity
2530
class BlockImporter(
2631
fetcher: ActorRef,
2732
ledger: Ledger,
2833
blockchain: Blockchain,
2934
syncConfig: SyncConfig,
3035
ommersPool: ActorRef,
3136
broadcaster: ActorRef,
32-
pendingTransactionsManager: ActorRef
37+
pendingTransactionsManager: ActorRef,
38+
checkpointBlockGenerator: CheckpointBlockGenerator
3339
) extends Actor
3440
with ActorLogging {
3541
import BlockImporter._
@@ -56,15 +62,35 @@ class BlockImporter(
5662

5763
private def running(state: ImporterState): Receive = handleTopMessages(state, running) orElse {
5864
case ReceiveTimeout => self ! PickBlocks
65+
5966
case PrintStatus => log.info("Block: {}, is on top?: {}", blockchain.getBestBlockNumber(), state.isOnTop)
67+
6068
case BlockFetcher.PickedBlocks(blocks) =>
6169
SignedTransaction.retrieveSendersInBackGround(blocks.toList.map(_.body))
6270
importBlocks(blocks)(state)
71+
6372
case MinedBlock(block) =>
6473
if (!state.importing) {
6574
importMinedBlock(block, state)
6675
}
76+
77+
case nc @ NewCheckpoint(parentHash, signatures) =>
78+
if (state.importing) {
79+
//TODO: is this ok? What delay?
80+
context.system.scheduler.scheduleOnce(100.millis, self, nc)
81+
} else {
82+
ledger.getBlockByHash(parentHash) match {
83+
case Some(parent) =>
84+
val checkpointBlock = checkpointBlockGenerator.generate(parent, Checkpoint(signatures))
85+
importCheckpointBlock(checkpointBlock, state)
86+
87+
case None =>
88+
log.error(s"Could not find parent (${ByteStringUtils.hash2string(parentHash)}) for new checkpoint block")
89+
}
90+
}
91+
6792
case ImportNewBlock(block, peerId) if state.isOnTop && !state.importing => importNewBlock(block, peerId, state)
93+
6894
case ImportDone(newBehavior) =>
6995
val newState = state.notImportingBlocks().branchResolved()
7096
val behavior: Behavior = getBehavior(newBehavior)
@@ -177,6 +203,9 @@ class BlockImporter(
177203
private def importMinedBlock(block: Block, state: ImporterState): Unit =
178204
importBlock(block, new MinedBlockImportMessages(block), informFetcherOnFail = false)(state)
179205

206+
private def importCheckpointBlock(block: Block, state: ImporterState): Unit =
207+
importBlock(block, new CheckpointBlockImportMessages(block), informFetcherOnFail = false)(state)
208+
180209
private def importNewBlock(block: Block, peerId: PeerId, state: ImporterState): Unit =
181210
importBlock(block, new NewBlockImportMessages(block, peerId), informFetcherOnFail = true)(state)
182211

@@ -294,10 +323,20 @@ object BlockImporter {
294323
syncConfig: SyncConfig,
295324
ommersPool: ActorRef,
296325
broadcaster: ActorRef,
297-
pendingTransactionsManager: ActorRef
326+
pendingTransactionsManager: ActorRef,
327+
checkpointBlockGenerator: CheckpointBlockGenerator
298328
): Props =
299329
Props(
300-
new BlockImporter(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager)
330+
new BlockImporter(
331+
fetcher,
332+
ledger,
333+
blockchain,
334+
syncConfig,
335+
ommersPool,
336+
broadcaster,
337+
pendingTransactionsManager,
338+
checkpointBlockGenerator
339+
)
301340
)
302341

303342
type Behavior = ImporterState => Receive
@@ -308,6 +347,7 @@ object BlockImporter {
308347
case object OnTop extends ImporterMsg
309348
case object NotOnTop extends ImporterMsg
310349
case class MinedBlock(block: Block) extends ImporterMsg
350+
case class NewCheckpoint(parentHash: ByteString, signatures: Seq[ECDSASignature]) extends ImporterMsg
311351
case class ImportNewBlock(block: Block, peerId: PeerId) extends ImporterMsg
312352
case class ImportDone(newBehavior: NewBehavior) extends ImporterMsg
313353
case object PickBlocks extends ImporterMsg

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ImportMessages.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ class MinedBlockImportMessages(block: Block) extends ImportMessages(block) {
5454
(ErrorLevel, s"Ignoring mined block $exception")
5555
}
5656

57+
class CheckpointBlockImportMessages(block: Block) extends ImportMessages(block) {
58+
import ImportMessages._
59+
override def preImport(): LogEntry = (DebugLevel, s"Importing new checkpoint block (${block.idTag})")
60+
override def importedToTheTop(): LogEntry =
61+
(DebugLevel, s"Added new checkpont block $number to top of the chain")
62+
override def enqueued(): LogEntry = (DebugLevel, s"Checkpoint block $number was added to the queue")
63+
override def duplicated(): LogEntry =
64+
(DebugLevel, "Ignoring duplicate checkpoint block")
65+
override def orphaned(): LogEntry =
66+
(ErrorLevel, "Checkpoint block has no parent. This should never happen")
67+
override def reorganisedChain(newBranch: List[Block]): LogEntry =
68+
(DebugLevel, s"Addition of new checkpoint block $number resulting in chain reorganization")
69+
override def importFailed(error: String): LogEntry =
70+
(WarningLevel, s"Failed to execute checkpoint block because of $error")
71+
override def missingStateNode(exception: MissingNodeException): LogEntry =
72+
(ErrorLevel, s"Ignoring checkpoint block: $exception")
73+
}
74+
5775
class NewBlockImportMessages(block: Block, peerId: PeerId) extends ImportMessages(block) {
5876
import ImportMessages._
5977
override def preImport(): LogEntry = (DebugLevel, s"Handling NewBlock message for block (${block.idTag})")

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/RegularSync.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package io.iohk.ethereum.blockchain.sync.regular
33
import akka.actor.{Actor, ActorLogging, ActorRef, AllForOneStrategy, Cancellable, Props, Scheduler, SupervisorStrategy}
44
import akka.util.ByteString
55
import io.iohk.ethereum.blockchain.sync.BlockBroadcast
6+
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
67
import io.iohk.ethereum.crypto.ECDSASignature
78
import io.iohk.ethereum.domain.{Block, Blockchain}
89
import io.iohk.ethereum.ledger.Ledger
10+
import io.iohk.ethereum.utils.ByteStringUtils
911
import io.iohk.ethereum.utils.Config.SyncConfig
1012

1113
class RegularSync(
@@ -17,6 +19,7 @@ class RegularSync(
1719
syncConfig: SyncConfig,
1820
ommersPool: ActorRef,
1921
pendingTransactionsManager: ActorRef,
22+
checkpointBlockGenerator: CheckpointBlockGenerator,
2023
scheduler: Scheduler
2124
) extends Actor
2225
with ActorLogging {
@@ -31,7 +34,16 @@ class RegularSync(
3134
)
3235
val importer: ActorRef =
3336
context.actorOf(
34-
BlockImporter.props(fetcher, ledger, blockchain, syncConfig, ommersPool, broadcaster, pendingTransactionsManager),
37+
BlockImporter.props(
38+
fetcher,
39+
ledger,
40+
blockchain,
41+
syncConfig,
42+
ommersPool,
43+
broadcaster,
44+
pendingTransactionsManager,
45+
checkpointBlockGenerator
46+
),
3547
"block-importer"
3648
)
3749

@@ -57,6 +69,10 @@ class RegularSync(
5769
case MinedBlock(block) =>
5870
log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString)
5971
importer ! BlockImporter.MinedBlock(block)
72+
73+
case NewCheckpoint(parentHash, signatures) =>
74+
log.info(s"Received new checkpoint for block ${ByteStringUtils.hash2string(parentHash)}")
75+
importer ! BlockImporter.NewCheckpoint(parentHash, signatures)
6076
}
6177

6278
override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy()(SupervisorStrategy.defaultDecider)
@@ -78,6 +94,7 @@ object RegularSync {
7894
syncConfig: SyncConfig,
7995
ommersPool: ActorRef,
8096
pendingTransactionsManager: ActorRef,
97+
checkpointBlockGenerator: CheckpointBlockGenerator,
8198
scheduler: Scheduler
8299
): Props =
83100
Props(
@@ -90,6 +107,7 @@ object RegularSync {
90107
syncConfig,
91108
ommersPool,
92109
pendingTransactionsManager,
110+
checkpointBlockGenerator,
93111
scheduler
94112
)
95113
)

0 commit comments

Comments
 (0)