From 8a048dc5fba8658101916d2dab00356406298339 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 29 Mar 2022 18:56:46 +0200 Subject: [PATCH 1/4] core: make reorg use less memory --- core/blockchain.go | 53 ++++++++++++++++++++++++++------------- core/types/transaction.go | 18 +++++++++++++ 2 files changed, 53 insertions(+), 18 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5ac12303cf73..d83f9678ae21 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1960,13 +1960,18 @@ func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log { // Note the new head block won't be processed here, callers need to handle it // externally. func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { + type block struct { + hash common.Hash + number uint64 + } + var ( - newChain types.Blocks + newChain []block oldChain types.Blocks commonBlock *types.Block - deletedTxs types.Transactions - addedTxs types.Transactions + deletedTxs []common.Hash + addedTxs []common.Hash deletedLogs [][]*types.Log rebirthLogs [][]*types.Log @@ -1976,7 +1981,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Old chain is longer, gather all transactions and logs as deleted ones for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) + for _, tx := range oldBlock.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } // Collect deleted logs for notification logs := bc.collectLogs(oldBlock.Hash(), true) @@ -1987,7 +1994,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } else { // New chain is longer, stash all blocks away for subsequent insertion for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) + newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) } } if oldBlock == nil { @@ -2006,14 +2013,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } // Remove an old block as well as stash away a new block oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) + for _, tx := range oldBlock.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } // Collect deleted logs for notification logs := bc.collectLogs(oldBlock.Hash(), true) if len(logs) > 0 { deletedLogs = append(deletedLogs, logs) } - newChain = append(newChain, newBlock) + newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) // Step back with both chains oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) @@ -2034,14 +2043,14 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { logFn = log.Warn } logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(), - "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) + "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].hash) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) blockReorgMeter.Mark(1) } else if len(newChain) > 0 { // Special case happens in the post merge stage that current head is // the ancestor of new head while these two blocks are not consecutive - log.Info("Extend chain", "add", len(newChain), "number", newChain[0].NumberU64(), "hash", newChain[0].Hash()) + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].number, "hash", newChain[0].hash) blockReorgAddMeter.Mark(int64(len(newChain))) } else { // len(newChain) == 0 && len(oldChain) > 0 @@ -2052,21 +2061,20 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - bc.writeHeadBlock(newChain[i]) + block := bc.GetBlock(newChain[i].hash, newChain[i].number) - // Collect reborn logs due to chain reorg - logs := bc.collectLogs(newChain[i].Hash(), false) - if len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs) - } + bc.writeHeadBlock(block) // Collect the new added transactions. - addedTxs = append(addedTxs, newChain[i].Transactions()...) + for _, tx := range block.Transactions() { + addedTxs = append(addedTxs, tx.Hash()) + } } + // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. indexesBatch := bc.db.NewBatch() - for _, tx := range types.TxDifference(deletedTxs, addedTxs) { - rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) + for _, tx := range types.TxDifferenceHash(deletedTxs, addedTxs) { + rawdb.DeleteTxLookupEntry(indexesBatch, tx) } // Delete any canonical number assignments above the new head number := bc.CurrentBlock().NumberU64() @@ -2080,6 +2088,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if err := indexesBatch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } + + // Collect the logs + for i := len(newChain) - 1; i >= 1; i-- { + // Collect reborn logs due to chain reorg + logs := bc.collectLogs(newChain[i].hash, false) + if len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs) + } + } // If any logs need to be fired, do it now. In theory we could avoid creating // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle diff --git a/core/types/transaction.go b/core/types/transaction.go index 29820a0d785f..2fb3da729798 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -432,6 +432,24 @@ func TxDifference(a, b Transactions) Transactions { return keep } +// TxDifferenceHash returns a new set which is the difference between a and b. +func TxDifferenceHash(a, b []common.Hash) []common.Hash { + keep := make([]common.Hash, 0, len(a)) + + remove := make(map[common.Hash]struct{}) + for _, tx := range b { + remove[tx] = struct{}{} + } + + for _, tx := range a { + if _, ok := remove[tx]; !ok { + keep = append(keep, tx) + } + } + + return keep +} + // TxByNonce implements the sort interface to allow sorting a list of transactions // by their nonces. This is usually only useful for sorting transactions from a // single account, otherwise a nonce comparison doesn't make much sense. From c5b0990686ed209f8050e44d90a4a5e74e8dc244 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 23 May 2022 11:52:38 +0200 Subject: [PATCH 2/4] core: revert block changes, add introspection --- core/blockchain.go | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index d83f9678ae21..0ed22309bf01 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "math/big" + "runtime" "sort" "sync" "sync/atomic" @@ -1960,13 +1961,8 @@ func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log { // Note the new head block won't be processed here, callers need to handle it // externally. func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { - type block struct { - hash common.Hash - number uint64 - } - var ( - newChain []block + newChain types.Blocks oldChain types.Blocks commonBlock *types.Block @@ -1994,7 +1990,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } else { // New chain is longer, stash all blocks away for subsequent insertion for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) + newChain = append(newChain, newBlock) } } if oldBlock == nil { @@ -2022,7 +2018,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if len(logs) > 0 { deletedLogs = append(deletedLogs, logs) } - newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) + newChain = append(newChain, newBlock) // Step back with both chains oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) @@ -2034,6 +2030,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return fmt.Errorf("invalid new chain") } } + PrintMemUsage() // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { logFn := log.Info @@ -2043,14 +2040,14 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { logFn = log.Warn } logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(), - "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].hash) + "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) blockReorgMeter.Mark(1) } else if len(newChain) > 0 { // Special case happens in the post merge stage that current head is // the ancestor of new head while these two blocks are not consecutive - log.Info("Extend chain", "add", len(newChain), "number", newChain[0].number, "hash", newChain[0].hash) + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash()) blockReorgAddMeter.Mark(int64(len(newChain))) } else { // len(newChain) == 0 && len(oldChain) > 0 @@ -2061,15 +2058,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - block := bc.GetBlock(newChain[i].hash, newChain[i].number) + bc.writeHeadBlock(newChain[i]) - bc.writeHeadBlock(block) // Collect the new added transactions. - for _, tx := range block.Transactions() { + for _, tx := range newChain[i].Transactions() { addedTxs = append(addedTxs, tx.Hash()) } } + PrintMemUsage() // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. indexesBatch := bc.db.NewBatch() @@ -2092,7 +2089,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Collect the logs for i := len(newChain) - 1; i >= 1; i-- { // Collect reborn logs due to chain reorg - logs := bc.collectLogs(newChain[i].hash, false) + logs := bc.collectLogs(newChain[i].Hash(), false) if len(logs) > 0 { rebirthLogs = append(rebirthLogs, logs) } @@ -2101,6 +2098,8 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle // networks where performance is not an issue either way. + + PrintMemUsage() if len(deletedLogs) > 0 { bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) } @@ -2115,6 +2114,20 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } +func PrintMemUsage() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + // InsertBlockWithoutSetHead executes the block, runs the necessary verification // upon it and then persist the block and the associate state into the database. // The key difference between the InsertChain is it won't do the canonical chain From 52f6397138f7df6a0223be00a3faa01e8550341b Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 23 May 2022 12:23:25 +0200 Subject: [PATCH 3/4] core: removed introspection --- core/blockchain.go | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 0ed22309bf01..cb96e2f787aa 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "math/big" - "runtime" "sort" "sync" "sync/atomic" @@ -2030,7 +2029,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return fmt.Errorf("invalid new chain") } } - PrintMemUsage() + // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { logFn := log.Info @@ -2066,7 +2065,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } } - PrintMemUsage() // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. indexesBatch := bc.db.NewBatch() @@ -2098,8 +2096,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle // networks where performance is not an issue either way. - - PrintMemUsage() if len(deletedLogs) > 0 { bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) } @@ -2114,20 +2110,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -func PrintMemUsage() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - // For info on each, see: https://golang.org/pkg/runtime/#MemStats - fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) - fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) - fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) - fmt.Printf("\tNumGC = %v\n", m.NumGC) -} - -func bToMb(b uint64) uint64 { - return b / 1024 / 1024 -} - // InsertBlockWithoutSetHead executes the block, runs the necessary verification // upon it and then persist the block and the associate state into the database. // The key difference between the InsertChain is it won't do the canonical chain From ecea243ad132284b026b8f2bde88d97aa736e2d6 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 23 May 2022 20:14:55 +0200 Subject: [PATCH 4/4] core: nit --- core/blockchain.go | 2 +- core/types/transaction.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index cb96e2f787aa..4ea949787f42 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2068,7 +2068,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. indexesBatch := bc.db.NewBatch() - for _, tx := range types.TxDifferenceHash(deletedTxs, addedTxs) { + for _, tx := range types.HashDifference(deletedTxs, addedTxs) { rawdb.DeleteTxLookupEntry(indexesBatch, tx) } // Delete any canonical number assignments above the new head diff --git a/core/types/transaction.go b/core/types/transaction.go index 2fb3da729798..715ede15db2e 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -432,18 +432,18 @@ func TxDifference(a, b Transactions) Transactions { return keep } -// TxDifferenceHash returns a new set which is the difference between a and b. -func TxDifferenceHash(a, b []common.Hash) []common.Hash { +// HashDifference returns a new set which is the difference between a and b. +func HashDifference(a, b []common.Hash) []common.Hash { keep := make([]common.Hash, 0, len(a)) remove := make(map[common.Hash]struct{}) - for _, tx := range b { - remove[tx] = struct{}{} + for _, hash := range b { + remove[hash] = struct{}{} } - for _, tx := range a { - if _, ok := remove[tx]; !ok { - keep = append(keep, tx) + for _, hash := range a { + if _, ok := remove[hash]; !ok { + keep = append(keep, hash) } }