@@ -21,10 +21,12 @@ import (
2121 "encoding/json"
2222 "errors"
2323 "fmt"
24+ gomath "math"
2425 "math/big"
2526 "math/rand"
2627 "sort"
2728 "sync"
29+ "sync/atomic"
2830 "time"
2931
3032 "github.com/ethereum/go-ethereum/common"
@@ -78,6 +80,29 @@ const (
7880 // and waste round trip times. If it's too high, we're capping responses and
7981 // waste bandwidth.
8082 maxTrieRequestCount = maxRequestSize / 512
83+
84+ // trienodeHealRateMeasurementImpact is the impact a single measurement has on
85+ // the local node's trienode processing capacity. A value closer to 0 reacts
86+ // slower to sudden changes, but it is also more stable against temporary hiccups.
87+ trienodeHealRateMeasurementImpact = 0.005
88+
89+ // minTrienodeHealThrottle is the minimum divisor for throttling trie node
90+ // heal requests to avoid overloading the local node and exessively expanding
91+ // the state trie bedth wise.
92+ minTrienodeHealThrottle = 1
93+
94+ // maxTrienodeHealThrottle is the maximum divisor for throttling trie node
95+ // heal requests to avoid overloading the local node and exessively expanding
96+ // the state trie bedth wise.
97+ maxTrienodeHealThrottle = maxTrieRequestCount
98+
99+ // trienodeHealThrottleIncrease is the multiplier for the throttle when the
100+ // rate of arriving data is higher than the rate of processing it.
101+ trienodeHealThrottleIncrease = 1.33
102+
103+ // trienodeHealThrottleDecrease is the divisor for the throttle when the
104+ // rate of arriving data is lower than the rate of processing it.
105+ trienodeHealThrottleDecrease = 1.25
81106)
82107
83108var (
@@ -431,6 +456,11 @@ type Syncer struct {
431456 trienodeHealReqs map [uint64 ]* trienodeHealRequest // Trie node requests currently running
432457 bytecodeHealReqs map [uint64 ]* bytecodeHealRequest // Bytecode requests currently running
433458
459+ trienodeHealRate float64 // Average heal rate for processing trie node data
460+ trienodeHealPend uint64 // Number of trie nodes currently pending for processing
461+ trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
462+ trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
463+
434464 trienodeHealSynced uint64 // Number of state trie nodes downloaded
435465 trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
436466 trienodeHealDups uint64 // Number of state trie nodes already processed
@@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
476506 trienodeHealIdlers : make (map [string ]struct {}),
477507 bytecodeHealIdlers : make (map [string ]struct {}),
478508
479- trienodeHealReqs : make (map [uint64 ]* trienodeHealRequest ),
480- bytecodeHealReqs : make (map [uint64 ]* bytecodeHealRequest ),
481- stateWriter : db .NewBatch (),
509+ trienodeHealReqs : make (map [uint64 ]* trienodeHealRequest ),
510+ bytecodeHealReqs : make (map [uint64 ]* bytecodeHealRequest ),
511+ trienodeHealThrottle : maxTrienodeHealThrottle , // Tune downward instead of insta-filling with junk
512+ stateWriter : db .NewBatch (),
482513
483514 extProgress : new (SyncProgress ),
484515 }
@@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
13211352 if cap > maxTrieRequestCount {
13221353 cap = maxTrieRequestCount
13231354 }
1355+ cap = int (float64 (cap ) / s .trienodeHealThrottle )
1356+ if cap <= 0 {
1357+ cap = 1
1358+ }
13241359 var (
13251360 hashes = make ([]common.Hash , 0 , cap )
13261361 paths = make ([]string , 0 , cap )
@@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
20902125// processTrienodeHealResponse integrates an already validated trienode response
20912126// into the healer tasks.
20922127func (s * Syncer ) processTrienodeHealResponse (res * trienodeHealResponse ) {
2128+ var (
2129+ start = time .Now ()
2130+ fills int
2131+ )
20932132 for i , hash := range res .hashes {
20942133 node := res .nodes [i ]
20952134
@@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
20982137 res .task .trieTasks [res .paths [i ]] = res .hashes [i ]
20992138 continue
21002139 }
2140+ fills ++
2141+
21012142 // Push the trie node into the state syncer
21022143 s .trienodeHealSynced ++
21032144 s .trienodeHealBytes += common .StorageSize (len (node ))
@@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
21212162 log .Crit ("Failed to persist healing data" , "err" , err )
21222163 }
21232164 log .Debug ("Persisted set of healing data" , "type" , "trienodes" , "bytes" , common .StorageSize (batch .ValueSize ()))
2165+
2166+ // Calculate the processing rate of one filled trie node
2167+ rate := float64 (fills ) / (float64 (time .Since (start )) / float64 (time .Second ))
2168+
2169+ // Update the currently measured trienode queueing and processing throughput.
2170+ //
2171+ // The processing rate needs to be updated uniformly independent if we've
2172+ // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
2173+ // the face of varying network packets. As such, we cannot just measure the
2174+ // time it took to process N trie nodes and update once, we need one update
2175+ // per trie node.
2176+ //
2177+ // Naively, that would be:
2178+ //
2179+ // for i:=0; i<fills; i++ {
2180+ // healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
2181+ // }
2182+ //
2183+ // Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
2184+ //
2185+ // We can expand that formula for the Nth item as:
2186+ // HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
2187+ //
2188+ // The above is a geometric sequence that can be summed to:
2189+ // HR(N) = (1-MI)^N*(OR-NR) + NR
2190+ s .trienodeHealRate = gomath .Pow (1 - trienodeHealRateMeasurementImpact , float64 (fills ))* (s .trienodeHealRate - rate ) + rate
2191+
2192+ pending := atomic .LoadUint64 (& s .trienodeHealPend )
2193+ if time .Since (s .trienodeHealThrottled ) > time .Second {
2194+ // Periodically adjust the trie node throttler
2195+ if float64 (pending ) > 2 * s .trienodeHealRate {
2196+ s .trienodeHealThrottle *= trienodeHealThrottleIncrease
2197+ } else {
2198+ s .trienodeHealThrottle /= trienodeHealThrottleDecrease
2199+ }
2200+ if s .trienodeHealThrottle > maxTrienodeHealThrottle {
2201+ s .trienodeHealThrottle = maxTrienodeHealThrottle
2202+ } else if s .trienodeHealThrottle < minTrienodeHealThrottle {
2203+ s .trienodeHealThrottle = minTrienodeHealThrottle
2204+ }
2205+ s .trienodeHealThrottled = time .Now ()
2206+
2207+ log .Debug ("Updated trie node heal throttler" , "rate" , s .trienodeHealRate , "pending" , pending , "throttle" , s .trienodeHealThrottle )
2208+ }
21242209}
21252210
21262211// processBytecodeHealResponse integrates an already validated bytecode response
@@ -2655,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
26552740
26562741 // Cross reference the requested trienodes with the response to find gaps
26572742 // that the serving node is missing
2658- hasher := sha3 .NewLegacyKeccak256 ().(crypto.KeccakState )
2659- hash := make ([]byte , 32 )
2660-
2661- nodes := make ([][]byte , len (req .hashes ))
2743+ var (
2744+ hasher = sha3 .NewLegacyKeccak256 ().(crypto.KeccakState )
2745+ hash = make ([]byte , 32 )
2746+ nodes = make ([][]byte , len (req .hashes ))
2747+ fills uint64
2748+ )
26622749 for i , j := 0 , 0 ; i < len (trienodes ); i ++ {
26632750 // Find the next hash that we've been served, leaving misses with nils
26642751 hasher .Reset ()
@@ -2670,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
26702757 }
26712758 if j < len (req .hashes ) {
26722759 nodes [j ] = trienodes [i ]
2760+ fills ++
26732761 j ++
26742762 continue
26752763 }
26762764 // We've either ran out of hashes, or got unrequested data
26772765 logger .Warn ("Unexpected healing trienodes" , "count" , len (trienodes )- i )
2766+
26782767 // Signal this request as failed, and ready for rescheduling
26792768 s .scheduleRevertTrienodeHealRequest (req )
26802769 return errors .New ("unexpected healing trienode" )
26812770 }
26822771 // Response validated, send it to the scheduler for filling
2772+ atomic .AddUint64 (& s .trienodeHealPend , fills )
2773+ defer func () {
2774+ atomic .AddUint64 (& s .trienodeHealPend , ^ (fills - 1 ))
2775+ }()
26832776 response := & trienodeHealResponse {
26842777 paths : req .paths ,
26852778 task : req .task ,
0 commit comments