Skip to content

Commit fffb27b

Browse files
author
Nicolas Tallar
committed
[FIX] Handle timeouts on BlockFetcher
1 parent f7a7739 commit fffb27b

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

src/main/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcher.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ import cats.instances.future._
99
import cats.instances.option._
1010
import cats.syntax.either._
1111
import io.iohk.ethereum.blockchain.sync.PeersClient._
12-
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{
13-
AwaitingBodiesToBeIgnored,
14-
AwaitingHeadersToBeIgnored
15-
}
12+
import io.iohk.ethereum.blockchain.sync.regular.BlockFetcherState.{AwaitingBodiesToBeIgnored, AwaitingHeadersToBeIgnored}
1613
import io.iohk.ethereum.blockchain.sync.regular.BlockImporter.{ImportNewBlock, NotOnTop, OnTop}
1714
import io.iohk.ethereum.blockchain.sync.regular.RegularSync.ProgressProtocol
1815
import io.iohk.ethereum.crypto.kec256
@@ -44,7 +41,7 @@ class BlockFetcher(
4441
import BlockFetcher._
4542

4643
implicit val ec: ExecutionContext = context.dispatcher
47-
implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 1.second // some margin for actor communication
44+
implicit val timeout: Timeout = syncConfig.peerResponseTimeout + 2.second // some margin for actor communication
4845

4946
override def receive: Receive = idle()
5047

@@ -130,7 +127,7 @@ class BlockFetcher(
130127

131128
fetchBlocks(newState)
132129
case RetryHeadersRequest if state.isFetchingHeaders =>
133-
log.debug("Time-out occurred while waiting for headers")
130+
log.debug("Something failed on a headers request, cancelling the request and re-fetching")
134131

135132
val newState = state.withHeaderFetchReceived
136133
fetchBlocks(newState)
@@ -149,7 +146,7 @@ class BlockFetcher(
149146

150147
fetchBlocks(newState)
151148
case RetryBodiesRequest if state.isFetchingBodies =>
152-
log.debug("Time-out occurred while waiting for bodies")
149+
log.debug("Something failed on a bodies request, cancelling the request and re-fetching")
153150

154151
val newState = state.withBodiesFetchReceived
155152
fetchBlocks(newState)
@@ -316,6 +313,10 @@ class BlockFetcher(
316313
(peersClient ? request)
317314
.tap(blacklistPeerOnFailedRequest)
318315
.flatMap(failureTo(responseFallback))
316+
.recover { case error =>
317+
log.error(error, "Unexpected error on a request")
318+
responseFallback
319+
}
319320

320321
private def blacklistPeerOnFailedRequest(msg: Any): Unit = msg match {
321322
case RequestFailed(peer, reason) => peersClient ! BlacklistPeer(peer.id, reason)

src/test/scala/io/iohk/ethereum/blockchain/sync/regular/BlockFetcherSpec.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,24 @@ class BlockFetcherSpec extends TestKit(ActorSystem("BlockFetcherSpec_System")) w
195195
assert(HeadersSeq.areChain(headers))
196196
}
197197
}
198+
199+
"should properly handle a request timeout" in new TestSetup {
200+
override lazy val syncConfig = defaultSyncConfig.copy(
201+
// Small timeout on ask pattern for testing it here
202+
peerResponseTimeout = 3.seconds
203+
)
204+
205+
startFetcher()
206+
207+
val firstGetBlockHeadersRequest =
208+
GetBlockHeaders(Left(1), syncConfig.blockHeadersPerRequest, skip = 0, reverse = false)
209+
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
210+
211+
// Request should timeout without any response from the peer
212+
Thread.sleep((2 * syncConfig.peerResponseTimeout).toMillis)
213+
214+
peersClient.expectMsgPF() { case PeersClient.Request(msg, _, _) if msg == firstGetBlockHeadersRequest => () }
215+
}
198216
}
199217

200218
trait TestSetup extends TestSyncConfig {

0 commit comments

Comments
 (0)