Skip to content

Commit b80e59a

Browse files
author
colinlyguo
committed
refactor
1 parent 9571c89 commit b80e59a

File tree

2 files changed

+176
-123
lines changed

2 files changed

+176
-123
lines changed

rollup/cmd/proposer_tool/app/app.go

Lines changed: 8 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,18 @@ package app
33
import (
44
"context"
55
"fmt"
6-
"math/big"
76
"os"
87
"os/signal"
9-
"time"
108

11-
"github.com/prometheus/client_golang/prometheus"
129
"github.com/scroll-tech/da-codec/encoding"
13-
"github.com/scroll-tech/go-ethereum/common"
14-
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
15-
"github.com/scroll-tech/go-ethereum/ethclient"
1610
"github.com/scroll-tech/go-ethereum/log"
1711
"github.com/urfave/cli/v2"
1812

19-
"scroll-tech/common/database"
2013
"scroll-tech/common/utils"
2114
"scroll-tech/common/version"
22-
"scroll-tech/database/migrate"
2315

2416
"scroll-tech/rollup/internal/config"
2517
"scroll-tech/rollup/internal/controller/watcher"
26-
"scroll-tech/rollup/internal/orm"
27-
rutils "scroll-tech/rollup/internal/utils"
2818
)
2919

3020
var app *cli.App
@@ -54,130 +44,22 @@ func action(ctx *cli.Context) error {
5444
}
5545

5646
subCtx, cancel := context.WithCancel(ctx.Context)
57-
// Init db connection
58-
db, err := database.InitDB(cfg.DBConfig)
59-
if err != nil {
60-
log.Crit("failed to init db connection", "err", err)
61-
}
62-
sqlDB, err := db.DB()
63-
if err != nil {
64-
log.Crit("failed to get db connection", "error", err)
65-
}
66-
if err = migrate.ResetDB(sqlDB); err != nil {
67-
log.Crit("failed to reset db", "error", err)
68-
}
69-
log.Info("successfully reset db")
70-
defer func() {
71-
cancel()
72-
if err = database.CloseDB(db); err != nil {
73-
log.Crit("failed to close db connection", "error", err)
74-
}
75-
}()
76-
77-
// Init dbForReplay connection
78-
dbForReplay, err := database.InitDB(cfg.DBConfigForReplay)
79-
if err != nil {
80-
log.Crit("failed to init dbForReplay connection", "err", err)
81-
}
82-
defer func() {
83-
cancel()
84-
if err = database.CloseDB(dbForReplay); err != nil {
85-
log.Crit("failed to close dbForReplay connection", "error", err)
86-
}
87-
}()
88-
89-
// Init l2geth connection
90-
l2Client, err := ethclient.Dial(cfg.L2Config.Endpoint)
91-
if err != nil {
92-
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
93-
}
9447

9548
startL2BlockHeight := ctx.Uint64(utils.StartL2BlockFlag.Name)
9649

97-
prevChunk, err := orm.NewChunk(dbForReplay).GetParentChunkByBlockNumber(subCtx, startL2BlockHeight)
98-
if err != nil {
99-
log.Crit("failed to get previous chunk", "error", err)
100-
}
101-
102-
var startQueueIndex uint64
103-
if prevChunk != nil {
104-
startQueueIndex = prevChunk.TotalL1MessagesPoppedBefore + prevChunk.TotalL1MessagesPoppedInChunk
105-
}
106-
107-
startBlock := uint64(0)
108-
if prevChunk != nil {
109-
startBlock = prevChunk.EndBlockNumber + 1
110-
}
111-
112-
var chunk *encoding.Chunk
113-
for blockNum := startBlock; blockNum <= startL2BlockHeight; blockNum++ {
114-
block, err := l2Client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum)))
115-
if err != nil {
116-
log.Crit("failed to get block", "block number", blockNum, "error", err)
117-
}
118-
119-
for _, tx := range block.Transactions() {
120-
if tx.Type() == gethTypes.L1MessageTxType {
121-
startQueueIndex++
122-
}
123-
}
124-
125-
if blockNum == startL2BlockHeight {
126-
chunk = &encoding.Chunk{Blocks: []*encoding.Block{{Header: block.Header()}}}
127-
}
128-
}
129-
130-
// Setting empty hash as the post_l1_message_queue_hash of the first chunk,
131-
// i.e., treating the first L1 message after this chunk as the first L1 message in message queue v2.
132-
// Though this setting is different from mainnet, it's simple yet sufficient for data analysis usage.
133-
_, err = orm.NewChunk(db).InsertTestChunkForProposerTool(subCtx, chunk, encoding.CodecV0, startQueueIndex)
134-
if err != nil {
135-
log.Crit("failed to insert chunk", "error", err)
136-
}
137-
138-
batch := &encoding.Batch{
139-
Index: 0,
140-
TotalL1MessagePoppedBefore: 0,
141-
ParentBatchHash: common.Hash{},
142-
Chunks: []*encoding.Chunk{chunk},
143-
}
144-
145-
var dbBatch *orm.Batch
146-
dbBatch, err = orm.NewBatch(db).InsertBatch(subCtx, batch, encoding.CodecV0, rutils.BatchMetrics{})
147-
if err != nil {
148-
log.Crit("failed to insert batch", "error", err)
149-
}
150-
151-
if err = orm.NewChunk(db).UpdateBatchHashInRange(subCtx, 0, 0, dbBatch.Hash); err != nil {
152-
log.Crit("failed to update batch hash for chunks", "error", err)
153-
}
154-
155-
registry := prometheus.DefaultRegisterer
156-
15750
genesisPath := ctx.String(utils.Genesis.Name)
15851
genesis, err := utils.ReadGenesis(genesisPath)
15952
if err != nil {
16053
log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err)
16154
}
16255

163-
// sanity check config
164-
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
165-
log.Crit("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
166-
}
167-
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
168-
log.Crit("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
169-
}
170-
17156
minCodecVersion := encoding.CodecVersion(ctx.Uint(utils.MinCodecVersionFlag.Name))
172-
chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, genesis.Config, db, registry)
173-
chunkProposer.SetReplayDB(dbForReplay)
174-
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, minCodecVersion, genesis.Config, db, registry)
175-
batchProposer.SetReplayDB(dbForReplay)
176-
bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, minCodecVersion, genesis.Config, db, registry)
17757

178-
go utils.Loop(subCtx, 100*time.Millisecond, chunkProposer.TryProposeChunk)
179-
go utils.Loop(subCtx, 100*time.Millisecond, batchProposer.TryProposeBatch)
180-
go utils.Loop(subCtx, 100*time.Millisecond, bundleProposer.TryProposeBundle)
58+
proposerTool, err := watcher.NewProposerTool(subCtx, cancel, cfg, startL2BlockHeight, minCodecVersion, genesis.Config)
59+
if err != nil {
60+
log.Crit("failed to create proposer tool", "startL2BlockHeight", startL2BlockHeight, "minCodecVersion", minCodecVersion, "error", err)
61+
}
62+
proposerTool.Start()
18163

18264
// Finish start all proposer tool functions.
18365
log.Info("Start proposer-tool successfully", "version", version.Version)
@@ -189,6 +71,9 @@ func action(ctx *cli.Context) error {
18971
// Wait until the interrupt signal is received from an OS signal.
19072
<-interrupt
19173

74+
cancel()
75+
proposerTool.Stop()
76+
19277
return nil
19378
}
19479

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package watcher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/big"
7+
"time"
8+
9+
"github.com/scroll-tech/da-codec/encoding"
10+
"github.com/scroll-tech/go-ethereum/common"
11+
gethTypes "github.com/scroll-tech/go-ethereum/core/types"
12+
"github.com/scroll-tech/go-ethereum/ethclient"
13+
"github.com/scroll-tech/go-ethereum/log"
14+
"github.com/scroll-tech/go-ethereum/params"
15+
"gorm.io/gorm"
16+
17+
"scroll-tech/common/database"
18+
"scroll-tech/common/utils"
19+
"scroll-tech/database/migrate"
20+
21+
"scroll-tech/rollup/internal/config"
22+
"scroll-tech/rollup/internal/orm"
23+
rutils "scroll-tech/rollup/internal/utils"
24+
)
25+
26+
// ProposerTool is a tool for proposing chunks and bundles to the L1 chain.
27+
type ProposerTool struct {
28+
ctx context.Context
29+
cancel context.CancelFunc
30+
31+
db *gorm.DB
32+
dbForReplay *gorm.DB
33+
client *ethclient.Client
34+
35+
chunkProposer *ChunkProposer
36+
batchProposer *BatchProposer
37+
bundleProposer *BundleProposer
38+
}
39+
40+
// NewProposerTool creates a new ProposerTool instance.
41+
func NewProposerTool(ctx context.Context, cancel context.CancelFunc, cfg *config.ConfigForReplay, startL2BlockHeight uint64, minCodecVersion encoding.CodecVersion, chainCfg *params.ChainConfig) (*ProposerTool, error) {
42+
// Init db connection
43+
db, err := database.InitDB(cfg.DBConfig)
44+
if err != nil {
45+
return nil, fmt.Errorf("failed to init db connection: %w", err)
46+
}
47+
sqlDB, err := db.DB()
48+
if err != nil {
49+
return nil, fmt.Errorf("failed to get db connection: %w", err)
50+
}
51+
if err = migrate.ResetDB(sqlDB); err != nil {
52+
return nil, fmt.Errorf("failed to reset db: %w", err)
53+
}
54+
log.Info("successfully reset db")
55+
56+
// Init dbForReplay connection
57+
dbForReplay, err := database.InitDB(cfg.DBConfigForReplay)
58+
if err != nil {
59+
return nil, fmt.Errorf("failed to init dbForReplay connection: %w", err)
60+
}
61+
62+
client, err := ethclient.Dial(cfg.L2Config.Endpoint)
63+
if err != nil {
64+
return nil, fmt.Errorf("failed to connect to L2 geth, endpoint: %s, err: %w", cfg.L2Config.Endpoint, err)
65+
}
66+
67+
prevChunk, err := orm.NewChunk(dbForReplay).GetParentChunkByBlockNumber(ctx, startL2BlockHeight)
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to get previous chunk: %w", err)
70+
}
71+
72+
var startQueueIndex uint64
73+
if prevChunk != nil {
74+
startQueueIndex = prevChunk.TotalL1MessagesPoppedBefore + prevChunk.TotalL1MessagesPoppedInChunk
75+
}
76+
77+
startBlock := uint64(0)
78+
if prevChunk != nil {
79+
startBlock = prevChunk.EndBlockNumber + 1
80+
}
81+
82+
var chunk *encoding.Chunk
83+
for blockNum := startBlock; blockNum <= startL2BlockHeight; blockNum++ {
84+
block, err := client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNum))
85+
if err != nil {
86+
return nil, fmt.Errorf("failed to get block %d: %w", blockNum, err)
87+
}
88+
89+
for _, tx := range block.Transactions() {
90+
if tx.Type() == gethTypes.L1MessageTxType {
91+
startQueueIndex++
92+
}
93+
}
94+
95+
if blockNum == startL2BlockHeight {
96+
chunk = &encoding.Chunk{Blocks: []*encoding.Block{{Header: block.Header()}}}
97+
}
98+
}
99+
100+
// Setting empty hash as the post_l1_message_queue_hash of the first chunk,
101+
// i.e., treating the first L1 message after this chunk as the first L1 message in message queue v2.
102+
// Though this setting is different from mainnet, it's simple yet sufficient for data analysis usage.
103+
_, err = orm.NewChunk(db).InsertTestChunkForProposerTool(ctx, chunk, minCodecVersion, startQueueIndex)
104+
if err != nil {
105+
return nil, fmt.Errorf("failed to insert chunk, minCodecVersion: %d, startQueueIndex: %d, err: %w", minCodecVersion, startQueueIndex, err)
106+
}
107+
108+
batch := &encoding.Batch{
109+
Index: 0,
110+
TotalL1MessagePoppedBefore: 0,
111+
ParentBatchHash: common.Hash{},
112+
Chunks: []*encoding.Chunk{chunk},
113+
}
114+
115+
var dbBatch *orm.Batch
116+
dbBatch, err = orm.NewBatch(db).InsertBatch(ctx, batch, encoding.CodecV0, rutils.BatchMetrics{})
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to insert batch: %w", err)
119+
}
120+
121+
if err = orm.NewChunk(db).UpdateBatchHashInRange(ctx, 0, 0, dbBatch.Hash); err != nil {
122+
return nil, fmt.Errorf("failed to update batch hash for chunks: %w", err)
123+
}
124+
125+
// sanity check config
126+
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
127+
return nil, fmt.Errorf("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
128+
}
129+
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
130+
return nil, fmt.Errorf("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
131+
}
132+
133+
chunkProposer := NewChunkProposer(ctx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, chainCfg, db, nil)
134+
chunkProposer.SetReplayDB(dbForReplay)
135+
batchProposer := NewBatchProposer(ctx, cfg.L2Config.BatchProposerConfig, minCodecVersion, chainCfg, db, nil)
136+
batchProposer.SetReplayDB(dbForReplay)
137+
bundleProposer := NewBundleProposer(ctx, cfg.L2Config.BundleProposerConfig, minCodecVersion, chainCfg, db, nil)
138+
139+
return &ProposerTool{
140+
ctx: ctx,
141+
cancel: cancel,
142+
143+
db: db,
144+
dbForReplay: dbForReplay,
145+
client: client,
146+
147+
chunkProposer: chunkProposer,
148+
batchProposer: batchProposer,
149+
bundleProposer: bundleProposer,
150+
}, nil
151+
}
152+
153+
func (p *ProposerTool) Start() {
154+
go utils.Loop(p.ctx, 100*time.Millisecond, p.chunkProposer.TryProposeChunk)
155+
go utils.Loop(p.ctx, 100*time.Millisecond, p.batchProposer.TryProposeBatch)
156+
go utils.Loop(p.ctx, 100*time.Millisecond, p.bundleProposer.TryProposeBundle)
157+
}
158+
159+
func (p *ProposerTool) Stop() {
160+
p.cancel()
161+
if err := database.CloseDB(p.db); err != nil {
162+
log.Error("failed to close db connection", "error", err)
163+
}
164+
if err := database.CloseDB(p.dbForReplay); err != nil {
165+
log.Error("failed to close dbForReplay connection", "error", err)
166+
}
167+
p.client.Close()
168+
}

0 commit comments

Comments
 (0)