Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime/debug"
)

var tag = "v4.5.46"
var tag = "v4.5.47"

var commit = func() string {
if info, ok := debug.ReadBuildInfo(); ok {
Expand Down
8 changes: 7 additions & 1 deletion rollup/internal/config/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (

// SenderConfig The config for transaction sender
type SenderConfig struct {
// The RPC endpoint of the ethereum or scroll public node.
// The RPC endpoint of the ethereum or scroll public node (for backward compatibility).
// If WriteEndpoints is specified, this endpoint will be used only for reading.
// If WriteEndpoints is empty, this endpoint will be used for both reading and writing.
Endpoint string `json:"endpoint"`
// The RPC endpoints to send transactions to (optional).
// If specified, transactions will be sent to all these endpoints in parallel.
// If empty, transactions will be sent to the Endpoint.
WriteEndpoints []string `json:"write_endpoints,omitempty"`
// The time to trigger check pending txs in sender.
CheckPendingTime uint64 `json:"check_pending_time"`
// The number of blocks to wait to escalate increase gas price of the transaction.
Expand Down
1 change: 1 addition & 0 deletions rollup/internal/controller/relayer/relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setupEnv(t *testing.T) {

cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetPoSL1EndPoint()
assert.NoError(t, err)
cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, cfg.L2Config.RelayerConfig.SenderConfig.Endpoint}
cfg.L1Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetL2GethEndPoint()
assert.NoError(t, err)

Expand Down
117 changes: 113 additions & 4 deletions rollup/internal/controller/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/holiman/uint256"
Expand Down Expand Up @@ -67,7 +68,8 @@ type FeeData struct {
type Sender struct {
config *config.SenderConfig
gethClient *gethclient.Client
client *ethclient.Client // The client to retrieve on chain data or send transaction.
client *ethclient.Client // The client to retrieve on chain data (read-only)
writeClients []*ethclient.Client // The clients to send transactions to (write operations)
transactionSigner *TransactionSigner
chainID *big.Int // The chain id of the endpoint
ctx context.Context
Expand All @@ -90,9 +92,10 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c
return nil, fmt.Errorf("invalid params, EscalateMultipleNum; %v, EscalateMultipleDen: %v", config.EscalateMultipleNum, config.EscalateMultipleDen)
}

// Initialize read client
rpcClient, err := rpc.Dial(config.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to dial eth client, err: %w", err)
return nil, fmt.Errorf("failed to dial read client, err: %w", err)
}

client := ethclient.NewClient(rpcClient)
Expand All @@ -105,12 +108,42 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c
return nil, fmt.Errorf("failed to create transaction signer, err: %w", err)
}

// Initialize write clients
var writeClients []*ethclient.Client
if len(config.WriteEndpoints) > 0 {
// Use specified write endpoints
for i, endpoint := range config.WriteEndpoints {
writeRpcClient, err := rpc.Dial(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to dial write client %d (endpoint: %s), err: %w", i, endpoint, err)
}
writeClient := ethclient.NewClient(writeRpcClient)

// Verify the write client is connected to the same chain
writeChainID, err := writeClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get chain ID from write client %d (endpoint: %s), err: %w", i, endpoint, err)
}
if writeChainID.Cmp(chainID) != 0 {
return nil, fmt.Errorf("write client %d (endpoint: %s) has different chain ID %s, expected %s", i, endpoint, writeChainID.String(), chainID.String())
}

writeClients = append(writeClients, writeClient)
}
log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", config.Endpoint, "writeEndpoints", config.WriteEndpoints)
} else {
// Use read client for writing (backward compatibility)
writeClients = append(writeClients, client)
log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", config.Endpoint)
}

// Create sender instance first and then initialize nonce
sender := &Sender{
ctx: ctx,
config: config,
gethClient: gethclient.New(rpcClient),
client: client,
writeClients: writeClients,
chainID: chainID,
transactionSigner: transactionSigner,
db: db,
Expand Down Expand Up @@ -169,6 +202,82 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy
}
}

// sendTransactionToMultipleClients sends a transaction to all write clients in parallel
// and returns success if at least one client succeeds
func (s *Sender) sendTransactionToMultipleClients(signedTx *gethTypes.Transaction) error {
ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second)
defer cancel()

if len(s.writeClients) == 1 {
// Single client - use direct approach
return s.writeClients[0].SendTransaction(ctx, signedTx)
}

// Multiple clients - send in parallel
type result struct {
endpoint string
err error
}

resultChan := make(chan result, len(s.writeClients))
var wg sync.WaitGroup

// Send transaction to all write clients in parallel
for i, client := range s.writeClients {
wg.Add(1)
// Determine endpoint URL for this client
endpoint := s.config.WriteEndpoints[i]

go func(ep string, writeClient *ethclient.Client) {
defer wg.Done()
err := writeClient.SendTransaction(ctx, signedTx)
resultChan <- result{endpoint: ep, err: err}
}(endpoint, client)
}

// Wait for all goroutines to finish
go func() {
wg.Wait()
close(resultChan)
}()

// Collect results
var errs []error
for res := range resultChan {
if res.err != nil {
errs = append(errs, fmt.Errorf("%s: %w", res.endpoint, res.err))
log.Warn("failed to send transaction to write client",
"endpoint", res.endpoint,
"txHash", signedTx.Hash().Hex(),
"nonce", signedTx.Nonce(),
"from", s.transactionSigner.GetAddr().String(),
"error", res.err)
} else {
log.Info("successfully sent transaction to write client",
"endpoint", res.endpoint,
"txHash", signedTx.Hash().Hex(),
"nonce", signedTx.Nonce(),
"from", s.transactionSigner.GetAddr().String())
}
}

// Check if at least one client succeeded
if len(errs) < len(s.writeClients) {
successCount := len(s.writeClients) - len(errs)
if len(errs) > 0 {
log.Info("transaction partially succeeded",
"txHash", signedTx.Hash().Hex(),
"successCount", successCount,
"totalClients", len(s.writeClients),
"failures", errors.Join(errs...))
}
return nil
}

// All clients failed
return fmt.Errorf("failed to send transaction to all %d write clients: %w", len(s.writeClients), errors.Join(errs...))
}

// SendTransaction send a signed L2tL1 transaction.
func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob) (common.Hash, uint64, error) {
s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc()
Expand Down Expand Up @@ -230,7 +339,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data
return common.Hash{}, 0, fmt.Errorf("failed to insert transaction, err: %w", err)
}

if err := s.client.SendTransaction(s.ctx, signedTx); err != nil {
if err := s.sendTransactionToMultipleClients(signedTx); err != nil {
// Delete the transaction from the pending transaction table if it fails to send.
if updateErr := s.pendingTransactionOrm.DeleteTransactionByTxHash(s.ctx, signedTx.Hash()); updateErr != nil {
log.Error("failed to delete transaction", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", updateErr)
Expand Down Expand Up @@ -645,7 +754,7 @@ func (s *Sender) checkPendingTransaction() {
return
}

if err := s.client.SendTransaction(s.ctx, newSignedTx); err != nil {
if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil {
if strings.Contains(err.Error(), "nonce too low") {
// When we receive a 'nonce too low' error but cannot find the transaction receipt, it indicates another transaction with this nonce has already been processed, so this transaction will never be mined and should be marked as failed.
log.Warn("nonce too low detected, marking all non-confirmed transactions with same nonce as failed", "nonce", originalTx.Nonce(), "address", s.transactionSigner.GetAddr().Hex(), "txHash", originalTx.Hash().Hex(), "newTxHash", newSignedTx.Hash().Hex(), "err", err)
Expand Down