Skip to content

Commit e50aeac

Browse files
authored
eth/traces: add state limit (#25812)
This PR introduces a new mechanism in chain tracer for preventing creating too many trace states. The workflow of chain tracer can be divided into several parts: - state creator generates trace state in a thread - state tracer retrieves the trace state and applies the tracing on top in another thread - state collector gathers all result from state tracer and stream to users It's basically a producer-consumer model here, while if we imagine that the state producer generates states too fast, then it will lead to accumulate lots of unused states in memory. Even worse, in path-based state scheme it will only keep the latest 128 states in memory, and the newly generated state will invalidate the oldest one by marking it as stale. The solution for fixing it is to limit the speed of state generation. If there are over 128 states un-consumed in memory, then the creation will be paused until the states are be consumed properly.
1 parent 5d52a35 commit e50aeac

File tree

3 files changed

+309
-39
lines changed

3 files changed

+309
-39
lines changed

eth/tracers/api.go

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ const (
6161
// For non-archive nodes, this limit _will_ be overblown, as disk-backed tries
6262
// will only be found every ~15K blocks or so.
6363
defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024)
64+
65+
// maximumPendingTraceStates is the maximum number of states allowed waiting
66+
// for tracing. The creation of trace state will be paused if the unused
67+
// trace states exceed this limit.
68+
maximumPendingTraceStates = 128
6469
)
6570

6671
// StateReleaseFunc is used to deallocate resources held by constructing a
@@ -251,30 +256,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf
251256
return sub, nil
252257
}
253258

254-
// releaser is a helper tool responsible for caching the release
255-
// callbacks of tracing state.
256-
type releaser struct {
257-
releases []StateReleaseFunc
258-
lock sync.Mutex
259-
}
260-
261-
func (r *releaser) add(release StateReleaseFunc) {
262-
r.lock.Lock()
263-
defer r.lock.Unlock()
264-
265-
r.releases = append(r.releases, release)
266-
}
267-
268-
func (r *releaser) call() {
269-
r.lock.Lock()
270-
defer r.lock.Unlock()
271-
272-
for _, release := range r.releases {
273-
release()
274-
}
275-
r.releases = r.releases[:0]
276-
}
277-
278259
// traceChain configures a new tracer according to the provided configuration, and
279260
// executes all the transactions contained within. The tracing chain range includes
280261
// the end block but excludes the start one. The return value will be one item per
@@ -291,11 +272,11 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
291272
threads = blocks
292273
}
293274
var (
294-
pend = new(sync.WaitGroup)
295-
ctx = context.Background()
296-
taskCh = make(chan *blockTraceTask, threads)
297-
resCh = make(chan *blockTraceTask, threads)
298-
reler = new(releaser)
275+
pend = new(sync.WaitGroup)
276+
ctx = context.Background()
277+
taskCh = make(chan *blockTraceTask, threads)
278+
resCh = make(chan *blockTraceTask, threads)
279+
tracker = newStateTracker(maximumPendingTraceStates, start.NumberU64())
299280
)
300281
for th := 0; th < threads; th++ {
301282
pend.Add(1)
@@ -326,8 +307,10 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
326307
task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number()))
327308
task.results[i] = &txTraceResult{Result: res}
328309
}
329-
// Tracing state is used up, queue it for de-referencing
330-
reler.add(task.release)
310+
// Tracing state is used up, queue it for de-referencing. Note the
311+
// state is the parent state of trace block, use block.number-1 as
312+
// the state number.
313+
tracker.releaseState(task.block.NumberU64()-1, task.release)
331314

332315
// Stream the result back to the result catcher or abort on teardown
333316
select {
@@ -354,8 +337,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
354337
close(taskCh)
355338
pend.Wait()
356339

357-
// Clean out any pending derefs.
358-
reler.call()
340+
// Clean out any pending release functions of trace states.
341+
tracker.callReleases()
359342

360343
// Log the chain result
361344
switch {
@@ -392,6 +375,13 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
392375
failed = err
393376
break
394377
}
378+
// Make sure the state creator doesn't go too far. Too many unprocessed
379+
// trace state may cause the oldest state to become stale(e.g. in
380+
// path-based scheme).
381+
if err = tracker.wait(number); err != nil {
382+
failed = err
383+
break
384+
}
395385
// Prepare the statedb for tracing. Don't use the live database for
396386
// tracing to avoid persisting state junks into the database. Switch
397387
// over to `preferDisk` mode only if the memory usage exceeds the
@@ -407,18 +397,18 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
407397
failed = err
408398
break
409399
}
410-
// Clean out any pending derefs. Note this step must be done after
411-
// constructing tracing state, because the tracing state of block
412-
// next depends on the parent state and construction may fail if
413-
// we release too early.
414-
reler.call()
400+
// Clean out any pending release functions of trace state. Note this
401+
// step must be done after constructing tracing state, because the
402+
// tracing state of block next depends on the parent state and construction
403+
// may fail if we release too early.
404+
tracker.callReleases()
415405

416406
// Send the block over to the concurrent tracers (if not in the fast-forward phase)
417407
txs := next.Transactions()
418408
select {
419409
case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}:
420410
case <-closed:
421-
reler.add(release)
411+
tracker.releaseState(number, release)
422412
return
423413
}
424414
traced += uint64(len(txs))

eth/tracers/tracker.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package tracers
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
)
23+
24+
// stateTracker is an auxiliary tool used to cache the release functions of all
25+
// used trace states, and to determine whether the creation of trace state needs
26+
// to be paused in case there are too many states waiting for tracing.
27+
type stateTracker struct {
28+
limit int // Maximum number of states allowed waiting for tracing
29+
oldest uint64 // The number of the oldest state which is still using for trace
30+
used []bool // List of flags indicating whether the trace state has been used up
31+
releases []StateReleaseFunc // List of trace state release functions waiting to be called
32+
cond *sync.Cond
33+
lock *sync.RWMutex
34+
}
35+
36+
// newStateTracker initializes the tracker with provided state limits and
37+
// the number of the first state that will be used.
38+
func newStateTracker(limit int, oldest uint64) *stateTracker {
39+
lock := new(sync.RWMutex)
40+
return &stateTracker{
41+
limit: limit,
42+
oldest: oldest,
43+
used: make([]bool, limit),
44+
cond: sync.NewCond(lock),
45+
lock: lock,
46+
}
47+
}
48+
49+
// releaseState marks the state specified by the number as released and caches
50+
// the corresponding release functions internally.
51+
func (t *stateTracker) releaseState(number uint64, release StateReleaseFunc) {
52+
t.lock.Lock()
53+
defer t.lock.Unlock()
54+
55+
// Set the state as used, the corresponding flag is indexed by
56+
// the distance between the specified state and the oldest state
57+
// which is still using for trace.
58+
t.used[int(number-t.oldest)] = true
59+
60+
// If the oldest state is used up, update the oldest marker by moving
61+
// it to the next state which is not used up.
62+
if number == t.oldest {
63+
var count int
64+
for _, used := range t.used {
65+
if !used {
66+
break
67+
}
68+
count += 1
69+
}
70+
t.oldest += uint64(count)
71+
copy(t.used, t.used[count:])
72+
73+
// Clean up the array tail since they are useless now.
74+
for i := t.limit - count; i < t.limit; i++ {
75+
t.used[i] = false
76+
}
77+
// Fire the signal to all waiters that oldest marker is updated.
78+
t.cond.Broadcast()
79+
}
80+
t.releases = append(t.releases, release)
81+
}
82+
83+
// callReleases invokes all cached release functions.
84+
func (t *stateTracker) callReleases() {
85+
t.lock.Lock()
86+
defer t.lock.Unlock()
87+
88+
for _, release := range t.releases {
89+
release()
90+
}
91+
t.releases = t.releases[:0]
92+
}
93+
94+
// wait blocks until the accumulated trace states are less than the limit.
95+
func (t *stateTracker) wait(number uint64) error {
96+
t.lock.Lock()
97+
defer t.lock.Unlock()
98+
99+
for {
100+
if number < t.oldest {
101+
return fmt.Errorf("invalid state number %d head %d", number, t.oldest)
102+
}
103+
if number < t.oldest+uint64(t.limit) {
104+
// number is now within limit, wait over
105+
return nil
106+
}
107+
t.cond.Wait()
108+
}
109+
}

0 commit comments

Comments
 (0)