@@ -3,13 +3,16 @@ package io.iohk.ethereum.blockchain.sync.regular
33import akka .actor .ActorRef
44import akka .util .ByteString
55import cats .data .NonEmptyList
6+ import io .iohk .ethereum .consensus .validators .BlockValidator
67import cats .implicits ._
78import io .iohk .ethereum .blockchain .sync .regular .BlockFetcherState ._
89import io .iohk .ethereum .domain .{Block , BlockBody , BlockHeader , HeadersSeq }
910import io .iohk .ethereum .network .PeerId
1011import io .iohk .ethereum .network .p2p .messages .PV62 .BlockHash
1112
1213import scala .collection .immutable .Queue
14+ import scala .annotation .tailrec
15+ import io .iohk .ethereum .consensus .validators .BlockValidator
1316
1417// scalastyle:off number.of.methods
1518/**
@@ -33,6 +36,7 @@ import scala.collection.immutable.Queue
3336 */
3437case class BlockFetcherState (
3538 importer : ActorRef ,
39+ blockValidator : BlockValidator ,
3640 readyBlocks : Queue [Block ],
3741 waitingHeaders : Queue [BlockHeader ],
3842 fetchingHeadersState : FetchingHeadersState ,
@@ -55,12 +59,6 @@ case class BlockFetcherState(
5559
5660 def hasReachedSize (size : Int ): Boolean = (readyBlocks.size + waitingHeaders.size) >= size
5761
58- def lastFullBlockNumber : BigInt =
59- readyBlocks.lastOption
60- .map(_.number)
61- .orElse(waitingHeaders.headOption.map(_.number - 1 ))
62- .getOrElse(lastBlock)
63-
6462 def lowestBlock : BigInt =
6563 readyBlocks.headOption
6664 .map(_.number)
@@ -117,35 +115,69 @@ case class BlockFetcherState(
117115 )
118116
119117 /**
120- * Matches bodies with headers in queue and adding matched bodies to the blocks.
121- * If bodies is empty collection - headers in queue are removed as the cause is:
122- * - the headers are from rejected fork and therefore it won't be possible to resolve bodies for them
123- * - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
124- * when making a request)
118+ * When bodies are requested, the response don't need to be a complete sub chain,
119+ * even more, we could receive an empty chain and that will be considered valid. Here we just
120+ * validate that the received bodies corresponds to an ordered subset of the requested headers.
125121 */
126- def addBodies (peerId : PeerId , bodies : Seq [BlockBody ]): BlockFetcherState = {
127- if (bodies.isEmpty) {
128- copy(waitingHeaders = Queue .empty)
129- } else {
130- val (matching, waiting) = waitingHeaders.splitAt(bodies.length)
131- val blocks = matching.zip(bodies).map((Block .apply _).tupled)
122+ def validateBodies (receivedBodies : Seq [BlockBody ]): Either [String , Seq [Block ]] =
123+ bodiesAreOrderedSubsetOfRequested(waitingHeaders.toList, receivedBodies)
124+ .toRight(
125+ " Received unrequested bodies"
126+ )
132127
133- withPeerForBlocks(peerId, blocks.map(_.header.number))
134- .copy(
135- readyBlocks = readyBlocks.enqueue(blocks),
136- waitingHeaders = waiting
137- )
128+ // Checks that the received block bodies are an ordered subset of the ones requested
129+ @ tailrec
130+ private def bodiesAreOrderedSubsetOfRequested (
131+ requestedHeaders : Seq [BlockHeader ],
132+ respondedBodies : Seq [BlockBody ],
133+ matchedBlocks : Seq [Block ] = Nil
134+ ): Option [Seq [Block ]] =
135+ (requestedHeaders, respondedBodies) match {
136+ case (Seq (), _ +: _) => None
137+ case (_, Seq ()) => Some (matchedBlocks)
138+ case (header +: remainingHeaders, body +: remainingBodies) =>
139+ val doMatch = blockValidator.validateHeaderAndBody(header, body).isRight
140+ if (doMatch)
141+ bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block (header, body))
142+ else
143+ bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks)
138144 }
139- }
140145
141- def appendNewBlock (block : Block , fromPeer : PeerId ): BlockFetcherState =
142- withPeerForBlocks(fromPeer, Seq (block.header.number))
143- .withPossibleNewTopAt(block.number)
144- .withLastBlock(block.number)
145- .copy(
146- readyBlocks = readyBlocks.enqueue(block),
147- waitingHeaders = waitingHeaders.filter(block.number != _.number)
146+ /**
147+ * If blocks is empty collection - headers in queue are removed as the cause is:
148+ * - the headers are from rejected fork and therefore it won't be possible to resolve blocks for them
149+ * - given peer is still syncing (quite unlikely due to preference of peers with best total difficulty
150+ * when making a request)
151+ */
152+ def handleRequestedBlocks (blocks : Seq [Block ], fromPeer : PeerId ): BlockFetcherState =
153+ if (blocks.isEmpty)
154+ copy(
155+ waitingHeaders = Queue .empty
148156 )
157+ else
158+ blocks.foldLeft(this ) { case (state, block) =>
159+ state.enqueueRequestedBlock(block, fromPeer)
160+ }
161+
162+ /**
163+ * If the requested block is not the next in the line in the waiting headers queue,
164+ * we opt for not adding it in the ready blocks queue.
165+ */
166+ def enqueueRequestedBlock (block : Block , fromPeer : PeerId ): BlockFetcherState =
167+ waitingHeaders.dequeueOption
168+ .map { case (waitingHeader, waitingHeadersTail) =>
169+ if (waitingHeader.hash == block.hash)
170+ withPeerForBlocks(fromPeer, Seq (block.number))
171+ .withPossibleNewTopAt(block.number)
172+ .withLastBlock(block.number)
173+ .copy(
174+ readyBlocks = readyBlocks.enqueue(block),
175+ waitingHeaders = waitingHeadersTail
176+ )
177+ else
178+ this
179+ }
180+ .getOrElse(this )
149181
150182 def pickBlocks (amount : Int ): Option [(NonEmptyList [Block ], BlockFetcherState )] =
151183 if (readyBlocks.nonEmpty) {
@@ -242,17 +274,19 @@ case class BlockFetcherState(
242274object BlockFetcherState {
243275 case class StateNodeFetcher (hash : ByteString , replyTo : ActorRef )
244276
245- def initial (importer : ActorRef , lastBlock : BigInt ): BlockFetcherState = BlockFetcherState (
246- importer = importer,
247- readyBlocks = Queue (),
248- waitingHeaders = Queue (),
249- fetchingHeadersState = NotFetchingHeaders ,
250- fetchingBodiesState = NotFetchingBodies ,
251- stateNodeFetcher = None ,
252- lastBlock = lastBlock,
253- knownTop = lastBlock + 1 ,
254- blockProviders = Map ()
255- )
277+ def initial (importer : ActorRef , blockValidator : BlockValidator , lastBlock : BigInt ): BlockFetcherState =
278+ BlockFetcherState (
279+ importer = importer,
280+ blockValidator = blockValidator,
281+ readyBlocks = Queue (),
282+ waitingHeaders = Queue (),
283+ fetchingHeadersState = NotFetchingHeaders ,
284+ fetchingBodiesState = NotFetchingBodies ,
285+ stateNodeFetcher = None ,
286+ lastBlock = lastBlock,
287+ knownTop = lastBlock + 1 ,
288+ blockProviders = Map ()
289+ )
256290
257291 trait FetchingHeadersState
258292 case object NotFetchingHeaders extends FetchingHeadersState
0 commit comments