Skip to content

Commit 54d8236

Browse files
authored
fix(rollup-relayer): graceful restart (#1564)
Co-authored-by: colinlyguo <[email protected]>
1 parent e3cf2cb commit 54d8236

File tree

3 files changed

+101
-73
lines changed

3 files changed

+101
-73
lines changed

common/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"runtime/debug"
66
)
77

8-
var tag = "v4.4.74"
8+
var tag = "v4.4.75"
99

1010
var commit = func() string {
1111
if info, ok := debug.ReadBuildInfo(); ok {

rollup/internal/controller/sender/sender.go

Lines changed: 73 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data
175175
s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc()
176176
var (
177177
feeData *FeeData
178-
tx *gethTypes.Transaction
179178
sidecar *gethTypes.BlobTxSidecar
180179
err error
181180
)
@@ -217,20 +216,35 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data
217216
return common.Hash{}, fmt.Errorf("failed to get fee data, err: %w", err)
218217
}
219218

220-
if tx, err = s.createAndSendTx(feeData, target, data, sidecar, nil); err != nil {
219+
signedTx, err := s.createTx(feeData, target, data, sidecar, nil)
220+
if err != nil {
221221
s.metrics.sendTransactionFailureSendTx.WithLabelValues(s.service, s.name).Inc()
222-
log.Error("failed to create and send tx (non-resubmit case)", "from", s.transactionSigner.GetAddr().String(), "nonce", s.transactionSigner.GetNonce(), "err", err)
223-
return common.Hash{}, fmt.Errorf("failed to create and send transaction, err: %w", err)
222+
log.Error("failed to create signed tx (non-resubmit case)", "from", s.transactionSigner.GetAddr().String(), "nonce", s.transactionSigner.GetNonce(), "err", err)
223+
return common.Hash{}, fmt.Errorf("failed to create signed transaction, err: %w", err)
224224
}
225225

226-
if err = s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, contextID, s.getSenderMeta(), tx, blockNumber); err != nil {
226+
// Insert the transaction into the pending transaction table.
227+
// A corner case is that the transaction is inserted into the table but not sent to the chain, because the server is stopped in the middle.
228+
// This case will be handled by the checkPendingTransaction function.
229+
if err = s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, contextID, s.getSenderMeta(), signedTx, blockNumber); err != nil {
227230
log.Error("failed to insert transaction", "from", s.transactionSigner.GetAddr().String(), "nonce", s.transactionSigner.GetNonce(), "err", err)
228231
return common.Hash{}, fmt.Errorf("failed to insert transaction, err: %w", err)
229232
}
230-
return tx.Hash(), nil
233+
234+
if err := s.client.SendTransaction(s.ctx, signedTx); err != nil {
235+
log.Error("failed to send tx", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", err)
236+
// Check if contain nonce, and reset nonce
237+
// only reset nonce when it is not from resubmit
238+
if strings.Contains(err.Error(), "nonce too low") {
239+
s.resetNonce(context.Background())
240+
}
241+
return common.Hash{}, fmt.Errorf("failed to send transaction, err: %w", err)
242+
}
243+
244+
return signedTx.Hash(), nil
231245
}
232246

233-
func (s *Sender) createAndSendTx(feeData *FeeData, target *common.Address, data []byte, sidecar *gethTypes.BlobTxSidecar, overrideNonce *uint64) (*gethTypes.Transaction, error) {
247+
func (s *Sender) createTx(feeData *FeeData, target *common.Address, data []byte, sidecar *gethTypes.BlobTxSidecar, overrideNonce *uint64) (*gethTypes.Transaction, error) {
234248
var (
235249
nonce = s.transactionSigner.GetNonce()
236250
txData gethTypes.TxData
@@ -292,14 +306,9 @@ func (s *Sender) createAndSendTx(feeData *FeeData, target *common.Address, data
292306
return nil, err
293307
}
294308

295-
if err = s.client.SendTransaction(s.ctx, signedTx); err != nil {
296-
log.Error("failed to send tx", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", err)
297-
// Check if contain nonce, and reset nonce
298-
// only reset nonce when it is not from resubmit
299-
if strings.Contains(err.Error(), "nonce too low") && overrideNonce == nil {
300-
s.resetNonce(context.Background())
301-
}
302-
return nil, err
309+
// update nonce when it is not from resubmit
310+
if overrideNonce == nil {
311+
s.transactionSigner.SetNonce(nonce + 1)
303312
}
304313

305314
if feeData.gasTipCap != nil {
@@ -320,10 +329,6 @@ func (s *Sender) createAndSendTx(feeData *FeeData, target *common.Address, data
320329

321330
s.metrics.currentGasLimit.WithLabelValues(s.service, s.name).Set(float64(feeData.gasLimit))
322331

323-
// update nonce when it is not from resubmit
324-
if overrideNonce == nil {
325-
s.transactionSigner.SetNonce(nonce + 1)
326-
}
327332
return signedTx, nil
328333
}
329334

@@ -337,7 +342,7 @@ func (s *Sender) resetNonce(ctx context.Context) {
337342
s.transactionSigner.SetNonce(nonce)
338343
}
339344

340-
func (s *Sender) resubmitTransaction(tx *gethTypes.Transaction, baseFee, blobBaseFee uint64) (*gethTypes.Transaction, error) {
345+
func (s *Sender) createReplacingTransaction(tx *gethTypes.Transaction, baseFee, blobBaseFee uint64) (*gethTypes.Transaction, error) {
341346
escalateMultipleNum := new(big.Int).SetUint64(s.config.EscalateMultipleNum)
342347
escalateMultipleDen := new(big.Int).SetUint64(s.config.EscalateMultipleDen)
343348
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)
@@ -468,12 +473,12 @@ func (s *Sender) resubmitTransaction(tx *gethTypes.Transaction, baseFee, blobBas
468473

469474
nonce := tx.Nonce()
470475
s.metrics.resubmitTransactionTotal.WithLabelValues(s.service, s.name).Inc()
471-
tx, err := s.createAndSendTx(&feeData, tx.To(), tx.Data(), tx.BlobTxSidecar(), &nonce)
476+
signedTx, err := s.createTx(&feeData, tx.To(), tx.Data(), tx.BlobTxSidecar(), &nonce)
472477
if err != nil {
473-
log.Error("failed to create and send tx (resubmit case)", "from", s.transactionSigner.GetAddr().String(), "nonce", nonce, "err", err)
478+
log.Error("failed to create signed tx (resubmit case)", "from", s.transactionSigner.GetAddr().String(), "nonce", nonce, "err", err)
474479
return nil, err
475480
}
476-
return tx, nil
481+
return signedTx, nil
477482
}
478483

479484
// checkPendingTransaction checks the confirmation status of pending transactions against the latest confirmed block number.
@@ -500,38 +505,37 @@ func (s *Sender) checkPendingTransaction() {
500505
}
501506

502507
for _, txnToCheck := range transactionsToCheck {
503-
tx := new(gethTypes.Transaction)
504-
if err := tx.DecodeRLP(rlp.NewStream(bytes.NewReader(txnToCheck.RLPEncoding), 0)); err != nil {
508+
originalTx := new(gethTypes.Transaction)
509+
if err := originalTx.DecodeRLP(rlp.NewStream(bytes.NewReader(txnToCheck.RLPEncoding), 0)); err != nil {
505510
log.Error("failed to decode RLP", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "err", err)
506511
continue
507512
}
508513

509-
receipt, err := s.client.TransactionReceipt(s.ctx, tx.Hash())
514+
receipt, err := s.client.TransactionReceipt(s.ctx, originalTx.Hash())
510515
if err == nil { // tx confirmed.
511516
if receipt.BlockNumber.Uint64() <= confirmed {
512-
err := s.db.Transaction(func(dbTX *gorm.DB) error {
517+
if dbTxErr := s.db.Transaction(func(dbTX *gorm.DB) error {
513518
// Update the status of the transaction to TxStatusConfirmed.
514-
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash(), types.TxStatusConfirmed, dbTX); err != nil {
515-
log.Error("failed to update transaction status by tx hash", "hash", tx.Hash().String(), "sender meta", s.getSenderMeta(), "from", s.transactionSigner.GetAddr().String(), "nonce", tx.Nonce(), "err", err)
516-
return err
519+
if updateErr := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, originalTx.Hash(), types.TxStatusConfirmed, dbTX); updateErr != nil {
520+
log.Error("failed to update transaction status by tx hash", "hash", originalTx.Hash().String(), "sender meta", s.getSenderMeta(), "from", s.transactionSigner.GetAddr().String(), "nonce", originalTx.Nonce(), "err", updateErr)
521+
return updateErr
517522
}
518523
// Update other transactions with the same nonce and sender address as failed.
519-
if err := s.pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(s.ctx, txnToCheck.SenderAddress, tx.Nonce(), tx.Hash(), dbTX); err != nil {
520-
log.Error("failed to update other transactions as failed by nonce", "senderAddress", txnToCheck.SenderAddress, "nonce", tx.Nonce(), "excludedTxHash", tx.Hash(), "err", err)
521-
return err
524+
if updateErr := s.pendingTransactionOrm.UpdateOtherTransactionsAsFailedByNonce(s.ctx, txnToCheck.SenderAddress, originalTx.Nonce(), originalTx.Hash(), dbTX); updateErr != nil {
525+
log.Error("failed to update other transactions as failed by nonce", "senderAddress", txnToCheck.SenderAddress, "nonce", originalTx.Nonce(), "excludedTxHash", originalTx.Hash(), "err", updateErr)
526+
return updateErr
522527
}
523528
return nil
524-
})
525-
if err != nil {
526-
log.Error("db transaction failed after receiving confirmation", "err", err)
529+
}); dbTxErr != nil {
530+
log.Error("db transaction failed after receiving confirmation", "err", dbTxErr)
527531
return
528532
}
529533

530534
// send confirm message
531535
s.confirmCh <- &Confirmation{
532536
ContextID: txnToCheck.ContextID,
533537
IsSuccessful: receipt.Status == gethTypes.ReceiptStatusSuccessful,
534-
TxHash: tx.Hash(),
538+
TxHash: originalTx.Hash(),
535539
SenderType: s.senderType,
536540
}
537541
}
@@ -548,52 +552,60 @@ func (s *Sender) checkPendingTransaction() {
548552

549553
// early return if the previous transaction has not been confirmed yet.
550554
// currentNonce is already the confirmed nonce + 1.
551-
if tx.Nonce() > currentNonce {
552-
log.Debug("previous transaction not yet confirmed, skip bumping gas price", "address", txnToCheck.SenderAddress, "currentNonce", currentNonce, "txNonce", tx.Nonce())
555+
if originalTx.Nonce() > currentNonce {
556+
log.Debug("previous transaction not yet confirmed, skip bumping gas price", "address", txnToCheck.SenderAddress, "currentNonce", currentNonce, "txNonce", originalTx.Nonce())
553557
continue
554558
}
555559

556560
// It's possible that the pending transaction was marked as failed earlier in this loop (e.g., if one of its replacements has already been confirmed).
557561
// Therefore, we fetch the current transaction status again for accuracy before proceeding.
558-
status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(s.ctx, tx.Hash())
562+
status, err := s.pendingTransactionOrm.GetTxStatusByTxHash(s.ctx, originalTx.Hash())
559563
if err != nil {
560-
log.Error("failed to get transaction status by tx hash", "hash", tx.Hash().String(), "err", err)
564+
log.Error("failed to get transaction status by tx hash", "hash", originalTx.Hash().String(), "err", err)
561565
return
562566
}
563567
if status == types.TxStatusConfirmedFailed {
564-
log.Warn("transaction already marked as failed, skipping resubmission", "hash", tx.Hash().String())
568+
log.Warn("transaction already marked as failed, skipping resubmission", "hash", originalTx.Hash().String())
565569
continue
566570
}
567571

568572
log.Info("resubmit transaction",
569573
"service", s.service,
570574
"name", s.name,
571-
"hash", tx.Hash().String(),
575+
"hash", originalTx.Hash().String(),
572576
"from", s.transactionSigner.GetAddr().String(),
573-
"nonce", tx.Nonce(),
577+
"nonce", originalTx.Nonce(),
574578
"submitBlockNumber", txnToCheck.SubmitBlockNumber,
575579
"currentBlockNumber", blockNumber,
576580
"escalateBlocks", s.config.EscalateBlocks)
577581

578-
if newTx, err := s.resubmitTransaction(tx, baseFee, blobBaseFee); err != nil {
582+
newSignedTx, err := s.createReplacingTransaction(originalTx, baseFee, blobBaseFee)
583+
if err != nil {
579584
s.metrics.resubmitTransactionFailedTotal.WithLabelValues(s.service, s.name).Inc()
580-
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.transactionSigner.GetAddr().String(), "nonce", tx.Nonce(), "err", err)
581-
} else {
582-
err := s.db.Transaction(func(dbTX *gorm.DB) error {
583-
// Update the status of the original transaction as replaced, while still checking its confirmation status.
584-
if err := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, tx.Hash(), types.TxStatusReplaced, dbTX); err != nil {
585-
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", tx.Hash().String(), err)
586-
}
587-
// Record the new transaction that has replaced the original one.
588-
if err := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newTx, blockNumber, dbTX); err != nil {
589-
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, previous block number: %v, current block number: %v, err: %w", txnToCheck.ContextID, newTx.Nonce(), newTx.Hash().String(), txnToCheck.SubmitBlockNumber, blockNumber, err)
590-
}
591-
return nil
592-
})
593-
if err != nil {
594-
log.Error("db transaction failed after resubmitting", "err", err)
595-
return
585+
log.Error("failed to resubmit transaction", "context ID", txnToCheck.ContextID, "sender meta", s.getSenderMeta(), "from", s.transactionSigner.GetAddr().String(), "nonce", originalTx.Nonce(), "err", err)
586+
return
587+
}
588+
589+
// Update the status of the original transaction as replaced, while still checking its confirmation status.
590+
// Insert the new transaction that has replaced the original one, and set the status as pending.
591+
// A corner case is that the transaction is inserted into the table but not sent to the chain, because the server is stopped in the middle.
592+
// This case will be handled by the checkPendingTransaction function.
593+
if dbTxErr := s.db.Transaction(func(dbTX *gorm.DB) error {
594+
if updateErr := s.pendingTransactionOrm.UpdatePendingTransactionStatusByTxHash(s.ctx, originalTx.Hash(), types.TxStatusReplaced, dbTX); updateErr != nil {
595+
return fmt.Errorf("failed to update status of transaction with hash %s to TxStatusReplaced, err: %w", newSignedTx.Hash().String(), updateErr)
596+
}
597+
if updateErr := s.pendingTransactionOrm.InsertPendingTransaction(s.ctx, txnToCheck.ContextID, s.getSenderMeta(), newSignedTx, blockNumber, dbTX); updateErr != nil {
598+
return fmt.Errorf("failed to insert new pending transaction with context ID: %s, nonce: %d, hash: %v, previous block number: %v, current block number: %v, err: %w", txnToCheck.ContextID, newSignedTx.Nonce(), newSignedTx.Hash().String(), txnToCheck.SubmitBlockNumber, blockNumber, updateErr)
596599
}
600+
return nil
601+
}); dbTxErr != nil {
602+
log.Error("db transaction failed after resubmitting", "err", dbTxErr)
603+
return
604+
}
605+
606+
if err := s.client.SendTransaction(s.ctx, newSignedTx); err != nil {
607+
log.Error("failed to send replacing tx", "tx hash", newSignedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", newSignedTx.Nonce(), "err", err)
608+
return
597609
}
598610
}
599611
}

rollup/internal/controller/sender/sender_test.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,17 @@ func testResubmitZeroGasPriceTransaction(t *testing.T) {
282282
gasFeeCap: big.NewInt(0),
283283
gasLimit: 50000,
284284
}
285-
tx, err := s.createAndSendTx(feeData, &common.Address{}, nil, nil, nil)
285+
tx, err := s.createTx(feeData, &common.Address{}, nil, nil, nil)
286286
assert.NoError(t, err)
287287
assert.NotNil(t, tx)
288+
err = s.client.SendTransaction(s.ctx, tx)
289+
assert.NoError(t, err)
288290
// Increase at least 1 wei in gas price, gas tip cap and gas fee cap.
289291
// Bumping the fees enough times to let the transaction be included in a block.
290292
for i := 0; i < 30; i++ {
291-
tx, err = s.resubmitTransaction(tx, 0, 0)
293+
tx, err = s.createReplacingTransaction(tx, 0, 0)
294+
assert.NoError(t, err)
295+
err = s.client.SendTransaction(s.ctx, tx)
292296
assert.NoError(t, err)
293297
}
294298

@@ -369,10 +373,14 @@ func testResubmitNonZeroGasPriceTransaction(t *testing.T) {
369373
sidecar, err = makeSidecar(txBlob[i])
370374
assert.NoError(t, err)
371375
}
372-
tx, err := s.createAndSendTx(feeData, &common.Address{}, nil, sidecar, nil)
376+
tx, err := s.createTx(feeData, &common.Address{}, nil, sidecar, nil)
373377
assert.NoError(t, err)
374378
assert.NotNil(t, tx)
375-
resubmittedTx, err := s.resubmitTransaction(tx, 0, 0)
379+
err = s.client.SendTransaction(s.ctx, tx)
380+
assert.NoError(t, err)
381+
resubmittedTx, err := s.createReplacingTransaction(tx, 0, 0)
382+
assert.NoError(t, err)
383+
err = s.client.SendTransaction(s.ctx, resubmittedTx)
376384
assert.NoError(t, err)
377385

378386
assert.Eventually(t, func() bool {
@@ -412,10 +420,14 @@ func testResubmitUnderpricedTransaction(t *testing.T) {
412420
gasFeeCap: big.NewInt(1000000000),
413421
gasLimit: 50000,
414422
}
415-
tx, err := s.createAndSendTx(feeData, &common.Address{}, nil, nil, nil)
423+
tx, err := s.createTx(feeData, &common.Address{}, nil, nil, nil)
416424
assert.NoError(t, err)
417425
assert.NotNil(t, tx)
418-
_, err = s.resubmitTransaction(tx, 0, 0)
426+
err = s.client.SendTransaction(s.ctx, tx)
427+
assert.NoError(t, err)
428+
resubmittedTx, err := s.createReplacingTransaction(tx, 0, 0)
429+
assert.NoError(t, err)
430+
err = s.client.SendTransaction(s.ctx, resubmittedTx)
419431
assert.Error(t, err, "replacement transaction underpriced")
420432

421433
assert.Eventually(t, func() bool {
@@ -462,7 +474,9 @@ func testResubmitDynamicFeeTransactionWithRisingBaseFee(t *testing.T) {
462474
// bump the basefee by 10x
463475
baseFeePerGas *= 10
464476
// resubmit and check that the gas fee has been adjusted accordingly
465-
newTx, err := s.resubmitTransaction(tx, baseFeePerGas, 0)
477+
resubmittedTx, err := s.createReplacingTransaction(tx, baseFeePerGas, 0)
478+
assert.NoError(t, err)
479+
err = s.client.SendTransaction(s.ctx, resubmittedTx)
466480
assert.NoError(t, err)
467481

468482
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)
@@ -471,7 +485,7 @@ func testResubmitDynamicFeeTransactionWithRisingBaseFee(t *testing.T) {
471485
expectedGasFeeCap = maxGasPrice
472486
}
473487

474-
assert.Equal(t, expectedGasFeeCap.Uint64(), newTx.GasFeeCap().Uint64())
488+
assert.Equal(t, expectedGasFeeCap.Uint64(), resubmittedTx.GasFeeCap().Uint64())
475489
s.Stop()
476490
}
477491

@@ -511,7 +525,9 @@ func testResubmitBlobTransactionWithRisingBaseFeeAndBlobBaseFee(t *testing.T) {
511525
baseFeePerGas *= 10
512526
blobBaseFeePerGas *= 10
513527
// resubmit and check that the gas fee has been adjusted accordingly
514-
newTx, err := s.resubmitTransaction(tx, baseFeePerGas, blobBaseFeePerGas)
528+
resubmittedTx, err := s.createReplacingTransaction(tx, baseFeePerGas, blobBaseFeePerGas)
529+
assert.NoError(t, err)
530+
err = s.client.SendTransaction(s.ctx, resubmittedTx)
515531
assert.NoError(t, err)
516532

517533
maxGasPrice := new(big.Int).SetUint64(s.config.MaxGasPrice)
@@ -526,8 +542,8 @@ func testResubmitBlobTransactionWithRisingBaseFeeAndBlobBaseFee(t *testing.T) {
526542
expectedBlobGasFeeCap = maxBlobGasPrice
527543
}
528544

529-
assert.Equal(t, expectedGasFeeCap.Uint64(), newTx.GasFeeCap().Uint64())
530-
assert.Equal(t, expectedBlobGasFeeCap.Uint64(), newTx.BlobGasFeeCap().Uint64())
545+
assert.Equal(t, expectedGasFeeCap.Uint64(), resubmittedTx.GasFeeCap().Uint64())
546+
assert.Equal(t, expectedBlobGasFeeCap.Uint64(), resubmittedTx.BlobGasFeeCap().Uint64())
531547
s.Stop()
532548
}
533549

0 commit comments

Comments
 (0)