Skip to content

Conversation

@KonradStaniec
Copy link
Contributor

@KonradStaniec KonradStaniec commented Oct 26, 2020

Description

Changes how nodes are processed and removes StateDownloader.

Speed of processing of each batch of nodes is highly volatile i.e at the beginning of sync it is quite fast and at the end it is quite slow, to keep the balance between number of responses queued to process in the presence of volatile number of peers, it is necessary to mark peer as active through whole mpt node request life cycle i.e

  1. ask for nodes
  2. hash and verify delivered nodes
  3. insert valid nodes into the running trie scheduler

Only after this whole cycle peer is marked as free to handle another request. That way we can achieve optimal throupout through whole state sync. It also guarantees that our depth first descent won't goes too much in breadth.

Such design makes separate StateDownloader not necessary and even troublesome as it requires to sync up state between two actors.

Testing

I had already synced it to Mainnet few times.

@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from 5ace25f to b2180b3 Compare October 27, 2020 14:49
@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from d3f4e56 to 632ba50 Compare October 30, 2020 06:17
@KonradStaniec KonradStaniec changed the title Etcm 275/async processing [ETCM-275] Async node processing and downloader removal Oct 30, 2020
@KonradStaniec KonradStaniec marked this pull request as ready for review October 30, 2020 08:23
@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from dbb28a8 to 9151899 Compare October 30, 2020 09:15
@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from 9f3e3ea to 389f71a Compare November 3, 2020 06:52
if (underlyingMessage.maxHeaders == 1) {
// pivot block
sender ! MessageFromPeer(BlockHeaders(Seq(pivotHeader)), peer)
this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You don't mutate state, so you could move this after the if statement. It will simplify a logic a little bit

}
import akka.pattern.pipe

// scalastyle:off
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢

case class RequestFailed(from: Peer, reason: String) extends RequestResult

sealed trait ProcessingError
case class Critical(er: CriticalError) extends ProcessingError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between Critical and DownloaderError with critical = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criticial stop the sync entierly as the trie is malformed for some reason, and DownloaderError wihth critical only blacklist peer. I will change the naming here as it can be confusing

Copy link
Contributor

@kapke kapke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First batch of comment, more will come tomorrow


case PeerRequestHandler.RequestFailed(peer, reason) =>
context unwatch (sender())
log.debug(s"Request failed to peer {} due to {}", peer.id, reason)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: "Request to peer {} failed due to {}" sounds a bit better IMO

): Receive = handleCommonMessages orElse handleRequestResults orElse {
case Sync if currentState.numberOfPendingRequests > 0 && restartRequested.isEmpty =>
val freePeers = getFreePeers(currentDownloaderState)
nodesToProcess.dequeueOption match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: since freePeers is always used with emptiness check this could be changed into:
(nodesToProcess.dequeueOption, NonEmptyList.fromList(freePeers)) match { //rest of code
Nice bonus of that approach:

  • no need to use fromListUnsafe
  • exhaustiveness checks will work (using if in case disabled exhaustiveness checker for given match expression)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great suggestion!

requests.foreach(req => requestNodes(req))
processNodes(newState, currentStats, newDownloaderState, nodes).pipeTo(self)
context.become(
syncing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that syncing handler tracks quite a bit of state now, maybe it makes sense to extract it to some class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep it will probably make code clearer

currentStats: ProcessingStatistics,
currentDownloaderState: DownloaderState,
requestResult: RequestResult
): Future[ProcessingResult] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense then to write this function without Future and wrap into Future at call site?


case class UsefulData(responses: List[SyncResponse]) extends ResponseProcessingResult

final case class DownloaderState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it go to a separate file now? This one got quite big already.

) {
override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = {
Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished)
Observable.repeat(Right(ByteString(1))).takeWhile(_ => !loadingFinished)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Observable.interval(1.ms).map(_ => Right(ByteString(1)).takeWhile(_ => !loadingFinished)?

I'm not very familiar with monix's internals, but I can imagine that just repeat gives not much time for other stuff on processing thread.

def idle(processingStatistics: ProcessingStatistics): Receive = {
def idle(processingStatistics: ProcessingStatistics): Receive = handleCommonMessages orElse {
case StartSyncingTo(root, bn) =>
val state1 = startSyncing(root, bn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startSyncing method is always followed by SyncSchedulerActorState.initial. Maybe that call could be part of startSyncing method?

// TODO we should probably start sync again from new target block, as current trie is malformed or declare
// fast sync as failure and start normal sync from scratch
context.stop(self)
case DownloaderError(newDownloaderState, peer, description, critical) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/critical/blacklistable?

onlyPivot: Boolean = false,
failedNodeRequest: Boolean = false
): Unit = {
val sender = TestProbe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not making it a method on autopilot? I can see convenience argument, but also in this way it would be easy to send a message to probe, which doesn't have autpilot installed

@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from e059252 to 535b149 Compare November 4, 2020 14:34
Move DownloaderState to separate file
Extract Actor state to separate class
Call Future.apply at call site
Improve synccontrollerspec autopilot
@KonradStaniec KonradStaniec force-pushed the etcm-275/async-processing branch from 535b149 to 0f644d5 Compare November 4, 2020 18:06
Copy link
Contributor

@kapke kapke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! If needed - I can try to sync to mainnet over the weekend to test.

restartRequester ! WaitingForNewTargetBlock
context.become(idle(currentStats.addSaved(currentState.memBatch.size)))
}
import akka.pattern.pipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor - I'd prefer to have this import either locally within method or on top of the file

Copy link
Contributor

@mmrozek mmrozek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@KonradStaniec
Copy link
Contributor Author

@kapke any additional syncing testing is appreciated (for now, I have tested it Locally and and on EC2 machine)

@KonradStaniec KonradStaniec merged commit 5db9fb9 into develop Nov 5, 2020
@KonradStaniec KonradStaniec deleted the etcm-275/async-processing branch November 5, 2020 13:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants