From e16f48feeddf27996793974159d803d72d9f4522 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 18 Sep 2025 08:15:56 +0800 Subject: [PATCH 1/5] feat(tx sender): add multiple write clients for more reliable tx submission --- rollup/conf/config.json | 5 + rollup/internal/config/relayer.go | 8 +- rollup/internal/controller/sender/sender.go | 117 +++++++++++++++++++- 3 files changed, 125 insertions(+), 5 deletions(-) diff --git a/rollup/conf/config.json b/rollup/conf/config.json index 8055008cd6..65578a34bb 100644 --- a/rollup/conf/config.json +++ b/rollup/conf/config.json @@ -40,6 +40,11 @@ "rollup_contract_address": "0x0000000000000000000000000000000000000000", "sender_config": { "endpoint": "https://rpc.ankr.com/eth", + "write_endpoints": [ + "https://rpc.ankr.com/eth", + "https://ethereum.publicnode.com", + "https://eth.llamarpc.com" + ], "escalate_blocks": 1, "confirmations": "0x0", "escalate_multiple_num": 2, diff --git a/rollup/internal/config/relayer.go b/rollup/internal/config/relayer.go index db2e274681..b67deb4dfd 100644 --- a/rollup/internal/config/relayer.go +++ b/rollup/internal/config/relayer.go @@ -7,8 +7,14 @@ import ( // SenderConfig The config for transaction sender type SenderConfig struct { - // The RPC endpoint of the ethereum or scroll public node. + // The RPC endpoint of the ethereum or scroll public node (for backward compatibility). + // If WriteEndpoints is specified, this endpoint will be used only for reading. + // If WriteEndpoints is empty, this endpoint will be used for both reading and writing. Endpoint string `json:"endpoint"` + // The RPC endpoints to send transactions to (optional). + // If specified, transactions will be sent to all these endpoints in parallel. + // If empty, transactions will be sent to the Endpoint. + WriteEndpoints []string `json:"write_endpoints,omitempty"` // The time to trigger check pending txs in sender. CheckPendingTime uint64 `json:"check_pending_time"` // The number of blocks to wait to escalate increase gas price of the transaction. diff --git a/rollup/internal/controller/sender/sender.go b/rollup/internal/controller/sender/sender.go index 8be721b097..10ea8e8305 100644 --- a/rollup/internal/controller/sender/sender.go +++ b/rollup/internal/controller/sender/sender.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "time" "github.com/holiman/uint256" @@ -67,7 +68,8 @@ type FeeData struct { type Sender struct { config *config.SenderConfig gethClient *gethclient.Client - client *ethclient.Client // The client to retrieve on chain data or send transaction. + client *ethclient.Client // The client to retrieve on chain data (read-only) + writeClients []*ethclient.Client // The clients to send transactions to (write operations) transactionSigner *TransactionSigner chainID *big.Int // The chain id of the endpoint ctx context.Context @@ -90,9 +92,10 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c return nil, fmt.Errorf("invalid params, EscalateMultipleNum; %v, EscalateMultipleDen: %v", config.EscalateMultipleNum, config.EscalateMultipleDen) } + // Initialize read client rpcClient, err := rpc.Dial(config.Endpoint) if err != nil { - return nil, fmt.Errorf("failed to dial eth client, err: %w", err) + return nil, fmt.Errorf("failed to dial read client, err: %w", err) } client := ethclient.NewClient(rpcClient) @@ -105,12 +108,42 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c return nil, fmt.Errorf("failed to create transaction signer, err: %w", err) } + // Initialize write clients + var writeClients []*ethclient.Client + if len(config.WriteEndpoints) > 0 { + // Use specified write endpoints + for i, endpoint := range config.WriteEndpoints { + writeRpcClient, err := rpc.Dial(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to dial write client %d (endpoint: %s), err: %w", i, endpoint, err) + } + writeClient := ethclient.NewClient(writeRpcClient) + + // Verify the write client is connected to the same chain + writeChainID, err := writeClient.ChainID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get chain ID from write client %d (endpoint: %s), err: %w", i, endpoint, err) + } + if writeChainID.Cmp(chainID) != 0 { + return nil, fmt.Errorf("write client %d (endpoint: %s) has different chain ID %s, expected %s", i, endpoint, writeChainID.String(), chainID.String()) + } + + writeClients = append(writeClients, writeClient) + } + log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", config.Endpoint, "writeEndpoints", config.WriteEndpoints) + } else { + // Use read client for writing (backward compatibility) + writeClients = append(writeClients, client) + log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", config.Endpoint) + } + // Create sender instance first and then initialize nonce sender := &Sender{ ctx: ctx, config: config, gethClient: gethclient.New(rpcClient), client: client, + writeClients: writeClients, chainID: chainID, transactionSigner: transactionSigner, db: db, @@ -169,6 +202,82 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy } } +// sendTransactionToMultipleClients sends a transaction to all write clients in parallel +// and returns success if at least one client succeeds +func (s *Sender) sendTransactionToMultipleClients(signedTx *gethTypes.Transaction) error { + ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second) + defer cancel() + + if len(s.writeClients) == 1 { + // Single client - use direct approach + return s.writeClients[0].SendTransaction(ctx, signedTx) + } + + // Multiple clients - send in parallel + type result struct { + endpoint string + err error + } + + resultChan := make(chan result, len(s.writeClients)) + var wg sync.WaitGroup + + // Send transaction to all write clients in parallel + for i, client := range s.writeClients { + wg.Add(1) + // Determine endpoint URL for this client + endpoint := s.config.WriteEndpoints[i] + + go func(ep string, writeClient *ethclient.Client) { + defer wg.Done() + err := writeClient.SendTransaction(ctx, signedTx) + resultChan <- result{endpoint: ep, err: err} + }(endpoint, client) + } + + // Wait for all goroutines to finish + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect results + var errs []error + for res := range resultChan { + if res.err != nil { + errs = append(errs, fmt.Errorf("%s: %w", res.endpoint, res.err)) + log.Warn("failed to send transaction to write client", + "endpoint", res.endpoint, + "txHash", signedTx.Hash().Hex(), + "nonce", signedTx.Nonce(), + "from", s.transactionSigner.GetAddr().String(), + "error", res.err) + } else { + log.Info("successfully sent transaction to write client", + "endpoint", res.endpoint, + "txHash", signedTx.Hash().Hex(), + "nonce", signedTx.Nonce(), + "from", s.transactionSigner.GetAddr().String()) + } + } + + // Check if at least one client succeeded + if len(errs) < len(s.writeClients) { + successCount := len(s.writeClients) - len(errs) + if len(errs) > 0 { + log.Info("transaction partially succeeded", + "txHash", signedTx.Hash().Hex(), + "successCount", successCount, + "totalClients", len(s.writeClients), + "failures", errors.Join(errs...)) + } + return nil + } + + // All clients failed + return fmt.Errorf("failed to send transaction to all %d write clients: %w", len(s.writeClients), errors.Join(errs...)) +} + // SendTransaction send a signed L2tL1 transaction. func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob) (common.Hash, uint64, error) { s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc() @@ -230,7 +339,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data return common.Hash{}, 0, fmt.Errorf("failed to insert transaction, err: %w", err) } - if err := s.client.SendTransaction(s.ctx, signedTx); err != nil { + if err := s.sendTransactionToMultipleClients(signedTx); err != nil { // Delete the transaction from the pending transaction table if it fails to send. if updateErr := s.pendingTransactionOrm.DeleteTransactionByTxHash(s.ctx, signedTx.Hash()); updateErr != nil { log.Error("failed to delete transaction", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", updateErr) @@ -645,7 +754,7 @@ func (s *Sender) checkPendingTransaction() { return } - if err := s.client.SendTransaction(s.ctx, newSignedTx); err != nil { + if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil { if strings.Contains(err.Error(), "nonce too low") { // When we receive a 'nonce too low' error but cannot find the transaction receipt, it indicates another transaction with this nonce has already been processed, so this transaction will never be mined and should be marked as failed. log.Warn("nonce too low detected, marking all non-confirmed transactions with same nonce as failed", "nonce", originalTx.Nonce(), "address", s.transactionSigner.GetAddr().Hex(), "txHash", originalTx.Hash().Hex(), "newTxHash", newSignedTx.Hash().Hex(), "err", err) From d79f9a174fef4c2aba14291a3d1fdbf5f011e8f7 Mon Sep 17 00:00:00 2001 From: jonastheis Date: Thu, 18 Sep 2025 00:19:25 +0000 Subject: [PATCH 2/5] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[bot?= =?UTF-8?q?]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/version/version.go b/common/version/version.go index df872f409f..592f969612 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.5.46" +var tag = "v4.5.47" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { From 2880939a0064a5f83e6594b94425f1a00c61e4dd Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 18 Sep 2025 08:45:02 +0800 Subject: [PATCH 3/5] fix tests --- rollup/internal/controller/relayer/relayer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rollup/internal/controller/relayer/relayer_test.go b/rollup/internal/controller/relayer/relayer_test.go index 48a65578cd..322a259aed 100644 --- a/rollup/internal/controller/relayer/relayer_test.go +++ b/rollup/internal/controller/relayer/relayer_test.go @@ -56,6 +56,7 @@ func setupEnv(t *testing.T) { cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetPoSL1EndPoint() assert.NoError(t, err) + cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, cfg.L2Config.RelayerConfig.SenderConfig.Endpoint} cfg.L1Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetL2GethEndPoint() assert.NoError(t, err) From 472b55cb9c1863498ba0f540f04da780697b2741 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 18 Sep 2025 09:24:13 +0800 Subject: [PATCH 4/5] fix tests --- rollup/conf/config.json | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rollup/conf/config.json b/rollup/conf/config.json index 65578a34bb..f8fac29fd5 100644 --- a/rollup/conf/config.json +++ b/rollup/conf/config.json @@ -40,11 +40,7 @@ "rollup_contract_address": "0x0000000000000000000000000000000000000000", "sender_config": { "endpoint": "https://rpc.ankr.com/eth", - "write_endpoints": [ - "https://rpc.ankr.com/eth", - "https://ethereum.publicnode.com", - "https://eth.llamarpc.com" - ], + "write_endpoints": [], "escalate_blocks": 1, "confirmations": "0x0", "escalate_multiple_num": 2, From 97a67b9de76a270e5865184375cc35c853646169 Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Thu, 18 Sep 2025 11:54:09 +0800 Subject: [PATCH 5/5] fix tests --- rollup/conf/config.json | 1 - 1 file changed, 1 deletion(-) diff --git a/rollup/conf/config.json b/rollup/conf/config.json index f8fac29fd5..8055008cd6 100644 --- a/rollup/conf/config.json +++ b/rollup/conf/config.json @@ -40,7 +40,6 @@ "rollup_contract_address": "0x0000000000000000000000000000000000000000", "sender_config": { "endpoint": "https://rpc.ankr.com/eth", - "write_endpoints": [], "escalate_blocks": 1, "confirmations": "0x0", "escalate_multiple_num": 2,