From 89f5aa610460f84162fd31d4a7f697c1e5c82515 Mon Sep 17 00:00:00 2001 From: songzhibin97 <718428482@qq.com> Date: Wed, 13 Mar 2024 11:00:45 +0800 Subject: [PATCH 1/9] fix: use time.After in loop, possible memory leaks --- core/bloombits/matcher.go | 6 +++++- eth/downloader/beaconsync.go | 7 ++++++- eth/downloader/downloader.go | 30 ++++++++++++++++++++++-------- ethstats/ethstats.go | 6 +++++- p2p/simulations/adapters/exec.go | 6 +++++- p2p/simulations/mocker.go | 7 ++++++- p2p/simulations/network.go | 5 ++++- 7 files changed, 53 insertions(+), 14 deletions(-) diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index 6a4cfb23db19..486581fe23d7 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [] // of the session, any request in-flight need to be responded to! Empty responses // are fine though in that case. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { + waitTimer := time.NewTimer(wait) + defer waitTimer.Stop() + for { // Allocate a new bloom bit index to retrieve data for, stopping when done bit, ok := s.allocateRetrieval() @@ -604,6 +607,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan } // Bit allocated, throttle a bit if we're below our batch limit if s.pendingSections(bit) < batch { + waitTimer.Reset(wait) select { case <-s.quit: // Session terminating, we can't meaningfully service, abort @@ -611,7 +615,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan s.deliverSections(bit, []uint64{}, [][]byte{}) return - case <-time.After(wait): + case <-waitTimer.C: // Throttling up, fetch whatever is available } } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index d3f75c852703..ee6039f5bae0 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -289,6 +289,10 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { localHeaders = d.readHeaderRange(tail, int(count)) log.Warn("Retrieved beacon headers from local", "from", from, "count", count) } + + fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) + defer fsHeaderContCheckTimer.Stop() + for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones @@ -381,8 +385,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { } // State sync still going, wait a bit for new headers and retry log.Trace("Pivot not yet committed, waiting...") + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: case <-d.cancelCh: return errCanceled } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6e7c5dcf02c8..bb2c835b3984 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1020,6 +1020,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e ancestor = from mode = d.getMode() ) + + fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) + defer fsHeaderContCheckTimer.Stop() + for { // Pull the next batch of headers, it either: // - Pivot check to see if the chain moved too far @@ -1124,8 +1128,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // Don't abort header fetches while the pivot is downloading if !d.committed.Load() && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: continue case <-d.cancelCh: return errCanceled @@ -1194,9 +1199,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // sleep a bit and retry. Take care with headers already consumed during // skeleton filling if len(headers) == 0 && !progressed { + fsHeaderContCheckTimer.Reset(fsHeaderContCheck) p.log.Trace("All headers delayed, waiting") select { - case <-time.After(fsHeaderContCheck): + case <-fsHeaderContCheckTimer.C: continue case <-d.cancelCh: return errCanceled @@ -1274,9 +1280,13 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { // queue until the stream ends or a failure occurs. func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error { var ( - mode = d.getMode() - gotHeaders = false // Wait for batches of headers to process + mode = d.getMode() + gotHeaders = false // Wait for batches of headers to process + secondTimer = time.NewTimer(time.Second) ) + + defer secondTimer.Stop() + for { select { case <-d.cancelCh: @@ -1397,10 +1407,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode if mode == FullSync || mode == SnapSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { + secondTimer.Reset(time.Second) select { case <-d.cancelCh: return errCanceled - case <-time.After(time.Second): + case <-secondTimer.C: } } // Otherwise insert the headers for content retrieval @@ -1565,9 +1576,11 @@ func (d *Downloader) processSnapSyncContent() error { // Note, there's no issue with memory piling up since after 64 blocks the // pivot will forcefully move so these accumulators will be dropped. var ( - oldPivot *fetchResult // Locked in pivot block, might change eventually - oldTail []*fetchResult // Downloaded content after the pivot + oldPivot *fetchResult // Locked in pivot block, might change eventually + oldTail []*fetchResult // Downloaded content after the pivot + secondTimer = time.NewTimer(time.Second) ) + defer secondTimer.Stop() for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to @@ -1650,6 +1663,7 @@ func (d *Downloader) processSnapSyncContent() error { oldPivot = P } // Wait for completion, occasionally checking for pivot staleness + secondTimer.Reset(time.Second) select { case <-sync.done: if sync.err != nil { @@ -1660,7 +1674,7 @@ func (d *Downloader) processSnapSyncContent() error { } oldPivot = nil - case <-time.After(time.Second): + case <-secondTimer.C: oldTail = afterP continue } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 6e71666ec121..0c334df7483f 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -544,10 +544,14 @@ func (s *Service) reportLatency(conn *connWrapper) error { return err } // Wait for the pong request to arrive back + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case <-s.pongCh: // Pong delivered, report the latency - case <-time.After(5 * time.Second): + case <-timer.C: // Ping timeout, abort return errors.New("ping timed out") } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 17e0f75d5ab9..10cfd12d71aa 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -303,10 +303,14 @@ func (n *ExecNode) Stop() error { go func() { waitErr <- n.Cmd.Wait() }() + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { case err := <-waitErr: return err - case <-time.After(5 * time.Second): + case <-timer.C: return n.Cmd.Process.Kill() } } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 0dc04e65f921..1388ef503ff8 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -67,6 +67,10 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { } tick := time.NewTicker(10 * time.Second) defer tick.Stop() + + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() + for { select { case <-quit: @@ -80,11 +84,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { return } + timer.Reset(3 * time.Second) select { case <-quit: log.Info("Terminating simulation loop") return - case <-time.After(3 * time.Second): + case <-timer.C: } log.Debug("starting node", "id", id) diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 4735e5cfa6cf..9c6230d83291 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error { } } + snapshotLoadTimeoutTimer := time.NewTimer(snapshotLoadTimeout) + defer snapshotLoadTimeoutTimer.Stop() + select { // Wait until all connections from the snapshot are established. case <-allConnected: // Make sure that we do not wait forever. - case <-time.After(snapshotLoadTimeout): + case <-snapshotLoadTimeoutTimer.C: return errors.New("snapshot connections not established") } return nil From 98db98fb428aef4f5c0afdcc132c08718190d9e6 Mon Sep 17 00:00:00 2001 From: songzhibin97 <718428482@qq.com> Date: Sat, 16 Mar 2024 08:56:50 +0800 Subject: [PATCH 2/9] fix:commit suggestion --- eth/downloader/downloader.go | 24 ++++++++++++------------ p2p/simulations/adapters/exec.go | 1 - p2p/simulations/mocker.go | 9 ++++++--- p2p/simulations/network.go | 6 +++--- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index bb2c835b3984..8bbda9904b39 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1280,12 +1280,12 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { // queue until the stream ends or a failure occurs. func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error { var ( - mode = d.getMode() - gotHeaders = false // Wait for batches of headers to process - secondTimer = time.NewTimer(time.Second) + mode = d.getMode() + gotHeaders = false // Wait for batches of headers to process + timer = time.NewTimer(time.Second) ) - defer secondTimer.Stop() + defer timer.Stop() for { select { @@ -1407,11 +1407,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode if mode == FullSync || mode == SnapSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { - secondTimer.Reset(time.Second) + timer.Reset(time.Second) select { case <-d.cancelCh: return errCanceled - case <-secondTimer.C: + case <-timer.C: } } // Otherwise insert the headers for content retrieval @@ -1576,11 +1576,11 @@ func (d *Downloader) processSnapSyncContent() error { // Note, there's no issue with memory piling up since after 64 blocks the // pivot will forcefully move so these accumulators will be dropped. var ( - oldPivot *fetchResult // Locked in pivot block, might change eventually - oldTail []*fetchResult // Downloaded content after the pivot - secondTimer = time.NewTimer(time.Second) + oldPivot *fetchResult // Locked in pivot block, might change eventually + oldTail []*fetchResult // Downloaded content after the pivot + timer = time.NewTimer(time.Second) ) - defer secondTimer.Stop() + defer timer.Stop() for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to @@ -1663,7 +1663,7 @@ func (d *Downloader) processSnapSyncContent() error { oldPivot = P } // Wait for completion, occasionally checking for pivot staleness - secondTimer.Reset(time.Second) + timer.Reset(time.Second) select { case <-sync.done: if sync.err != nil { @@ -1674,7 +1674,7 @@ func (d *Downloader) processSnapSyncContent() error { } oldPivot = nil - case <-secondTimer.C: + case <-timer.C: oldTail = afterP continue } diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 10cfd12d71aa..8fd027616aac 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -303,7 +303,6 @@ func (n *ExecNode) Stop() error { go func() { waitErr <- n.Cmd.Wait() }() - timer := time.NewTimer(5 * time.Second) defer timer.Stop() diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 1388ef503ff8..3e6998e94f45 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -65,10 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { if err != nil { panic("Could not startup node network for mocker") } - tick := time.NewTicker(10 * time.Second) - defer tick.Stop() - timer := time.NewTimer(3 * time.Second) + var ( + tick = time.NewTicker(10 * time.Second) + timer = time.NewTimer(3 * time.Second) + ) + + defer tick.Stop() defer timer.Stop() for { diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 9c6230d83291..95d7eda08a74 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -1028,14 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error { } } - snapshotLoadTimeoutTimer := time.NewTimer(snapshotLoadTimeout) - defer snapshotLoadTimeoutTimer.Stop() + timeout := time.NewTimer(snapshotLoadTimeout) + defer timeout.Stop() select { // Wait until all connections from the snapshot are established. case <-allConnected: // Make sure that we do not wait forever. - case <-snapshotLoadTimeoutTimer.C: + case <-timeout.C: return errors.New("snapshot connections not established") } return nil From beadd2b8f8db4369c301ff686732c0bd723a2aa4 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Mon, 18 Mar 2024 15:57:00 -0600 Subject: [PATCH 3/9] all: remove extra whitespace --- eth/downloader/downloader.go | 1 - ethstats/ethstats.go | 1 - p2p/simulations/mocker.go | 1 - 3 files changed, 3 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 8bbda9904b39..d48e5b26ef58 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1284,7 +1284,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode gotHeaders = false // Wait for batches of headers to process timer = time.NewTimer(time.Second) ) - defer timer.Stop() for { diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 0c334df7483f..c845db1164f5 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -544,7 +544,6 @@ func (s *Service) reportLatency(conn *connWrapper) error { return err } // Wait for the pong request to arrive back - timer := time.NewTimer(5 * time.Second) defer timer.Stop() diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 3e6998e94f45..5227e5f99613 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -70,7 +70,6 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { tick = time.NewTicker(10 * time.Second) timer = time.NewTimer(3 * time.Second) ) - defer tick.Stop() defer timer.Stop() From 22bf33dc5c47a7f4e2a30a3f10b18439096d7d14 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Mon, 18 Mar 2024 16:07:51 -0600 Subject: [PATCH 4/9] all: remove extra whitespace --- eth/downloader/beaconsync.go | 1 - p2p/simulations/mocker.go | 1 - 2 files changed, 2 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index ee6039f5bae0..7dfc419f4e9c 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -289,7 +289,6 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { localHeaders = d.readHeaderRange(tail, int(count)) log.Warn("Retrieved beacon headers from local", "from", from, "count", count) } - fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) defer fsHeaderContCheckTimer.Stop() diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 5227e5f99613..8763df67ef39 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -65,7 +65,6 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { if err != nil { panic("Could not startup node network for mocker") } - var ( tick = time.NewTicker(10 * time.Second) timer = time.NewTimer(3 * time.Second) From 244fe392588414e0ab5135f02ca194b7355d404f Mon Sep 17 00:00:00 2001 From: songzhibin97 <718428482@qq.com> Date: Tue, 19 Mar 2024 09:00:36 +0800 Subject: [PATCH 5/9] Update downloader.go --- eth/downloader/downloader.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d48e5b26ef58..ac3e941d8a64 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1015,13 +1015,13 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // Start pulling the header chain skeleton until all is done var ( - skeleton = true // Skeleton assembly phase or finishing up - pivoting = false // Whether the next request is pivot verification - ancestor = from - mode = d.getMode() + skeleton = true // Skeleton assembly phase or finishing up + pivoting = false // Whether the next request is pivot verification + ancestor = from + mode = d.getMode() + fsHeaderContCheckTimer = time.NewTimer(fsHeaderContCheck) ) - fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) defer fsHeaderContCheckTimer.Stop() for { From 1bdec68692cd771d912ba9a6daa6f53efcb40de4 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:01:36 -0600 Subject: [PATCH 6/9] eth/downloader: rm extra whitespace --- eth/downloader/downloader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index ac3e941d8a64..425ba9079a15 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1021,7 +1021,6 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e mode = d.getMode() fsHeaderContCheckTimer = time.NewTimer(fsHeaderContCheck) ) - defer fsHeaderContCheckTimer.Stop() for { From 5d63f79505c211fa39d5bb0e0c236e2acac91178 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:04:27 -0600 Subject: [PATCH 7/9] eth/downloader: add space --- eth/downloader/downloader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 425ba9079a15..7d3befe28482 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1579,6 +1579,7 @@ func (d *Downloader) processSnapSyncContent() error { timer = time.NewTimer(time.Second) ) defer timer.Stop() + for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to From c17bb93d7ad13f8b22f174703be4c30f51cea71e Mon Sep 17 00:00:00 2001 From: songzhibin97 <718428482@qq.com> Date: Wed, 20 Mar 2024 08:23:34 +0800 Subject: [PATCH 8/9] fix: ci --- eth/downloader/downloader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 7d3befe28482..8bdb0159aea2 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1579,7 +1579,7 @@ func (d *Downloader) processSnapSyncContent() error { timer = time.NewTimer(time.Second) ) defer timer.Stop() - + for { // Wait for the next batch of downloaded data to be available. If we have // not yet reached the pivot point, wait blockingly as there's no need to From 697a63675dd69e7da6055415e46fa84be78c11ff Mon Sep 17 00:00:00 2001 From: songzhibin97 <718428482@qq.com> Date: Wed, 20 Mar 2024 16:27:06 +0800 Subject: [PATCH 9/9] Update downloader.go --- eth/downloader/downloader.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 8bdb0159aea2..d1e175064503 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1015,14 +1015,11 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // Start pulling the header chain skeleton until all is done var ( - skeleton = true // Skeleton assembly phase or finishing up - pivoting = false // Whether the next request is pivot verification - ancestor = from - mode = d.getMode() - fsHeaderContCheckTimer = time.NewTimer(fsHeaderContCheck) + skeleton = true // Skeleton assembly phase or finishing up + pivoting = false // Whether the next request is pivot verification + ancestor = from + mode = d.getMode() ) - defer fsHeaderContCheckTimer.Stop() - for { // Pull the next batch of headers, it either: // - Pivot check to see if the chain moved too far @@ -1127,9 +1124,8 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // Don't abort header fetches while the pivot is downloading if !d.committed.Load() && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") - fsHeaderContCheckTimer.Reset(fsHeaderContCheck) select { - case <-fsHeaderContCheckTimer.C: + case <-time.After(fsHeaderContCheck): continue case <-d.cancelCh: return errCanceled @@ -1198,10 +1194,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // sleep a bit and retry. Take care with headers already consumed during // skeleton filling if len(headers) == 0 && !progressed { - fsHeaderContCheckTimer.Reset(fsHeaderContCheck) p.log.Trace("All headers delayed, waiting") select { - case <-fsHeaderContCheckTimer.C: + case <-time.After(fsHeaderContCheck): continue case <-d.cancelCh: return errCanceled