From d2c168945b62bbb8d6a186e1c255138f3f31a5fe Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 28 Apr 2025 02:07:40 -0300 Subject: [PATCH 01/15] test/chainnotifier: send to specific spend reg This is needed to have multiple spending registrations running and to send a notification to a specific spending registration. --- test/chainnotifier_mock.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index 96e72f23f..a4fae0e77 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -33,10 +33,11 @@ func (c *mockChainNotifier) RawClientWithMacAuth( // SpendRegistration contains registration details. type SpendRegistration struct { - Outpoint *wire.OutPoint - PkScript []byte - HeightHint int32 - ErrChan chan<- error + Outpoint *wire.OutPoint + PkScript []byte + HeightHint int32 + SpendChannel chan<- *chainntnfs.SpendDetail + ErrChan chan<- error } // ConfRegistration contains registration details. @@ -53,13 +54,15 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( chan *chainntnfs.SpendDetail, chan error, error) { + spendChan0 := make(chan *chainntnfs.SpendDetail) spendErrChan := make(chan error, 1) reg := &SpendRegistration{ - HeightHint: heightHint, - Outpoint: outpoint, - PkScript: pkScript, - ErrChan: spendErrChan, + HeightHint: heightHint, + Outpoint: outpoint, + PkScript: pkScript, + SpendChannel: spendChan0, + ErrChan: spendErrChan, } c.lnd.RegisterSpendChannel <- reg @@ -78,6 +81,12 @@ func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, case <-ctx.Done(): } + case m := <-spendChan0: + select { + case spendChan <- m: + case <-ctx.Done(): + } + case err := <-spendErrChan: select { case errChan <- err: From 650cf20fe9aec4c9aa29123397c501dd45a6de87 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sat, 26 Apr 2025 01:01:39 -0300 Subject: [PATCH 02/15] sweepbatcher: re-add sweeps after fully confirmed In case of a reorg sweeps should not go to another batch but stay in the current batch until it is reorg-safely confirmed. Only after that the remaining sweeps are re-added to another batch. Field sweep.completed is now set to true only for reorg-safely confirmed sweeps. In handleConf we now use batch.persist() (i.e. store.UpdateSweepBatch) instead of ConfirmBatch, because we set not only Confirmed flag, but also batchTxid. --- sweepbatcher/store.go | 4 +- sweepbatcher/sweep_batch.go | 245 ++++++++++--------- sweepbatcher/sweep_batcher.go | 46 ++-- sweepbatcher/sweep_batcher_presigned_test.go | 98 +++++++- sweepbatcher/sweep_batcher_test.go | 15 +- 5 files changed, 249 insertions(+), 159 deletions(-) diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 643d3219f..861882d2b 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -201,7 +201,7 @@ type dbBatch struct { // ID is the unique identifier of the batch. ID int32 - // Confirmed is set when the batch is fully confirmed. + // Confirmed is set when the batch is reorg-safely confirmed. Confirmed bool // BatchTxid is the txid of the batch transaction. @@ -236,7 +236,7 @@ type dbSweep struct { // Amount is the amount of the sweep. Amount btcutil.Amount - // Completed indicates whether this sweep is completed. + // Completed indicates whether this sweep is fully-confirmed. Completed bool } diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 583c6e41a..70f9a30df 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1943,7 +1943,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { var ( txHash = spendTx.TxHash() - purgeList = make([]SweepRequest, 0, len(b.sweeps)) notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash @@ -1953,7 +1952,105 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.Warnf("transaction %v has no outputs", txHash) } - // Determine if we should use presigned mode for the batch. + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + + // As a previous version of the batch transaction may get confirmed, + // which does not contain the latest sweeps, we need to detect which + // sweeps are in the transaction to correctly calculate fee portions + // and notify proper sweeps. + var ( + totalSweptAmt btcutil.Amount + confirmedSweeps = []wire.OutPoint{} + ) + for _, sweep := range b.sweeps { + // Skip sweeps that were not included into the confirmed tx. + _, found := confirmedSet[sweep.outpoint] + if !found { + continue + } + + totalSweptAmt += sweep.value + notifyList = append(notifyList, sweep) + confirmedSweeps = append(confirmedSweeps, sweep.outpoint) + } + + // Calculate the fee portion that each sweep should pay for the batch. + feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( + spendTx, len(notifyList), totalSweptAmt, + ) + + for _, sweep := range notifyList { + // If the sweep's notifier is empty then this means that a swap + // is not waiting to read an update from it, so we can skip + // the notification part. + if sweep.notifier == nil || + *sweep.notifier == (SpendNotifier{}) { + + continue + } + + spendDetail := SpendDetail{ + Tx: spendTx, + OnChainFeePortion: getFeePortionPaidBySweep( + spendTx, feePortionPaidPerSweep, + roundingDifference, &sweep, + ), + } + + // Dispatch the sweep notifier, we don't care about the outcome + // of this action so we don't wait for it. + go func() { + // Make sure this context doesn't expire so we + // successfully notify the caller. + ctx := context.WithoutCancel(ctx) + + sweep.notifySweepSpend(ctx, &spendDetail) + }() + } + + b.Infof("spent, confirmed sweeps: %v", confirmedSweeps) + + // We are no longer able to accept new sweeps, so we mark the batch as + // closed and persist on storage. + b.state = Closed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + if err := b.monitorConfirmations(ctx); err != nil { + return fmt.Errorf("monitorConfirmations failed: %w", err) + } + + return nil +} + +// handleConf handles a confirmation notification. This is the final step of the +// batch. Here we signal to the batcher that this batch was completed. +func (b *batch) handleConf(ctx context.Context, + conf *chainntnfs.TxConfirmation) error { + + spendTx := conf.Tx + txHash := spendTx.TxHash() + if b.batchTxid == nil || *b.batchTxid != txHash { + b.Warnf("Mismatch of batch txid: tx in spend notification had "+ + "txid %v, but confirmation notification has txif %v. "+ + "Using the later.", b.batchTxid, txHash) + } + b.batchTxid = &txHash + + b.Infof("confirmed in txid %s", b.batchTxid) + b.state = Confirmed + + if err := b.persist(ctx); err != nil { + return fmt.Errorf("saving batch failed: %w", err) + } + + // If the batch is in presigned mode, cleanup presignedHelper. presigned, err := b.isPresigned() if err != nil { return fmt.Errorf("failed to determine if the batch %d uses "+ @@ -1971,40 +2068,46 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { b.id, err) } + // Make a set of confirmed sweeps. + confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn)) + for _, txIn := range spendTx.TxIn { + confirmedSet[txIn.PreviousOutPoint] = struct{}{} + } + // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the // sweeps that did not make it to the confirmed transaction and feed // them back to the batcher. This will ensure that the sweeps will enter // a new batch instead of remaining dangling. var ( - totalSweptAmt btcutil.Amount confirmedSweeps = []wire.OutPoint{} - purgedSweeps = []wire.OutPoint{} - purgedSwaps = []lntypes.Hash{} + purgeList = make([]SweepRequest, 0, len(b.sweeps)) + totalSweptAmt btcutil.Amount ) for _, sweep := range allSweeps { - found := false - - for _, txIn := range spendTx.TxIn { - if txIn.PreviousOutPoint == sweep.outpoint { - found = true - totalSweptAmt += sweep.value - notifyList = append(notifyList, sweep) - confirmedSweeps = append( - confirmedSweeps, sweep.outpoint, - ) - - break + _, found := confirmedSet[sweep.outpoint] + if found { + // Save the sweep as completed. Note that sweeps are + // marked completed after the batch is marked confirmed + // because the check in handleSweeps checks sweep's + // status first and then checks the batch status. + err := b.persistSweep(ctx, sweep, true) + if err != nil { + return err } + + confirmedSweeps = append( + confirmedSweeps, sweep.outpoint, + ) + + totalSweptAmt += sweep.value + + continue } // If the sweep's outpoint was not found in the transaction's // inputs this means it was left out. So we delete it from this // batch and feed it back to the batcher. - if found { - continue - } - newSweep := sweep delete(b.sweeps, sweep.outpoint) @@ -2036,6 +2139,10 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { }) } } + var ( + purgedSweeps = []wire.OutPoint{} + purgedSwaps = []lntypes.Hash{} + ) for _, sweepReq := range purgeList { purgedSwaps = append(purgedSwaps, sweepReq.SwapHash) for _, input := range sweepReq.Inputs { @@ -2043,45 +2150,8 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } } - // Calculate the fee portion that each sweep should pay for the batch. - feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( - spendTx, len(notifyList), totalSweptAmt, - ) - - for _, sweep := range notifyList { - // Save the sweep as completed. - err := b.persistSweep(ctx, sweep, true) - if err != nil { - return err - } - - // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. - if sweep.notifier == nil || - *sweep.notifier == (SpendNotifier{}) { - - continue - } - - spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), - } - - // Dispatch the sweep notifier, we don't care about the outcome - // of this action so we don't wait for it. - go func() { - // Make sure this context doesn't expire so we - // successfully notify the caller. - ctx := context.WithoutCancel(ctx) - - sweep.notifySweepSpend(ctx, &spendDetail) - }() - } + b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+ + "purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps) // Proceed with purging the sweeps. This will feed the sweeps that // didn't make it to the confirmed batch transaction back to the batcher @@ -2103,49 +2173,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { } }() - b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+ - "purged swaps: %v, purged groups: %v", confirmedSweeps, - purgedSweeps, purgedSwaps, len(purgeList)) - - // We are no longer able to accept new sweeps, so we mark the batch as - // closed and persist on storage. - b.state = Closed - - if err = b.persist(ctx); err != nil { - return fmt.Errorf("saving batch failed: %w", err) - } - - if err = b.monitorConfirmations(ctx); err != nil { - return fmt.Errorf("monitorConfirmations failed: %w", err) - } - - return nil -} - -// handleConf handles a confirmation notification. This is the final step of the -// batch. Here we signal to the batcher that this batch was completed. We also -// cleanup up presigned transactions whose primarySweepID is one of the sweeps -// that were spent and fully confirmed: such a transaction can't be broadcasted -// since it is either in a block or double-spends one of spent outputs. -func (b *batch) handleConf(ctx context.Context, - conf *chainntnfs.TxConfirmation) error { - - spendTx := conf.Tx - txHash := spendTx.TxHash() - if b.batchTxid == nil || *b.batchTxid != txHash { - b.Warnf("Mismatch of batch txid: tx in spend notification had "+ - "txid %v, but confirmation notification has txif %v. "+ - "Using the later.", b.batchTxid, txHash) - } - b.batchTxid = &txHash - - // If the batch is in presigned mode, cleanup presignedHelper. - presigned, err := b.isPresigned() - if err != nil { - return fmt.Errorf("failed to determine if the batch %d uses "+ - "presigned mode: %w", b.id, err) - } - if presigned { b.Infof("Cleaning up presigned store") @@ -2161,19 +2188,7 @@ func (b *batch) handleConf(ctx context.Context, } } - b.Infof("confirmed in txid %s", b.batchTxid) - b.state = Confirmed - - if err := b.store.ConfirmBatch(ctx, b.id); err != nil { - return fmt.Errorf("failed to store confirmed state: %w", err) - } - // Calculate the fee portion that each sweep should pay for the batch. - // TODO: make sure spendTx matches b.sweeps. - var totalSweptAmt btcutil.Amount - for _, s := range b.sweeps { - totalSweptAmt += s.value - } feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( spendTx, len(b.sweeps), totalSweptAmt, ) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index e669e0ccb..96d88dad2 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -192,7 +192,8 @@ type PresignedHelper interface { loadOnly bool) (*wire.MsgTx, error) // CleanupTransactions removes all transactions related to any of the - // outpoints. Should be called after sweep batch tx is fully confirmed. + // outpoints. Should be called after sweep batch tx is reorg-safely + // confirmed. CleanupTransactions(ctx context.Context, inputs []wire.OutPoint) error } @@ -274,7 +275,7 @@ type addSweepsRequest struct { notifier *SpendNotifier // completed is set if the sweep is spent and the spending transaction - // is confirmed. + // is reorg-safely confirmed. completed bool // parentBatch is the parent batch of this sweep. It is loaded ony if @@ -296,7 +297,7 @@ type SpendDetail struct { } // ConfDetail is a notification that is send to the user of sweepbatcher when -// a batch is fully confirmed, i.e. gets batchConfHeight confirmations. +// a batch is reorg-safely confirmed, i.e. gets batchConfHeight confirmations. type ConfDetail struct { // TxConfirmation has data about the confirmation of the transaction. *chainntnfs.TxConfirmation @@ -808,8 +809,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { } // If this is a presigned mode, make sure PresignSweepsGroup was called. - // We skip the check for fully confirmed sweeps, because their presigned - // transactions were already cleaned up from the store. + // We skip the check for reorg-safely confirmed sweeps, because their + // presigned transactions were already cleaned up from the store. if sweep.presigned && !fullyConfirmed { err := ensurePresigned( ctx, sweeps, b.presignedHelper, b.chainParams, @@ -822,8 +823,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { } infof("Batcher adding sweep group of %d sweeps with primarySweep %x, "+ - "presigned=%v, completed=%v", len(sweeps), sweep.swapHash[:6], - sweep.presigned, completed) + "presigned=%v, fully_confirmed=%v", len(sweeps), + sweep.swapHash[:6], sweep.presigned, completed) req := &addSweepsRequest{ sweeps: sweeps, @@ -883,14 +884,10 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // If the sweep has already been completed in a confirmed batch then we // can't attach its notifier to the batch as that is no longer running. // Instead we directly detect and return the spend here. - if completed && *notifier != (SpendNotifier{}) { - // The parent batch is indeed confirmed, meaning it is complete - // and we won't be able to attach this sweep to it. - if parentBatch.Confirmed { - return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, - ) - } + if completed && parentBatch.Confirmed { + return b.monitorSpendAndNotify( + ctx, sweep, parentBatch.ID, notifier, + ) } sweep.notifier = notifier @@ -1158,11 +1155,11 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, } else { // We don't store Closed state separately in DB. // If the batch is closed (included into a block, but - // not fully confirmed), it is now considered Open - // again. It will receive a spending notification as - // soon as it starts, so it is not an issue. If a sweep - // manages to be added during this time, it will be - // detected as missing when analyzing the spend + // not reorg-safely confirmed), it is now considered + // Open again. It will receive a spending notification + // as soon as it starts, so it is not an issue. If a + // sweep manages to be added during this time, it will + // be detected as missing when analyzing the spend // notification and will be added to new batch. batch.state = Open } @@ -1192,6 +1189,11 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, parentBatchID int32, notifier *SpendNotifier) error { + // If the caller has not provided a notifier, stop. + if notifier == nil || *notifier == (SpendNotifier{}) { + return nil + } + spendCtx, cancel := context.WithCancel(ctx) // Then we get the total amount that was swept by the batch. @@ -1301,8 +1303,8 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, // monitorConfAndNotify monitors the confirmation of a specific transaction and // writes the response back to the response channel. It is called if the batch -// is fully confirmed and we just need to deliver the data back to the caller -// though SpendNotifier. +// is reorg-safely confirmed and we just need to deliver the data back to the +// caller though SpendNotifier. func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep, notifier *SpendNotifier, spendTx *wire.MsgTx, onChainFeePortion btcutil.Amount) error { diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 86f626cbc..8a7fc856f 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1360,7 +1360,12 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, require.LessOrEqual(t, numConfirmedSwaps, numSwaps) - const sweepsPerSwap = 2 + const ( + sweepsPerSwap = 2 + feeRate = chainfee.SatPerKWeight(10_000) + swapAmount = 3_000_001 + ) + sweepAmounts := []btcutil.Amount{1_000_001, 2_000_000} lnd := test.NewMockLnd() @@ -1370,7 +1375,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, customFeeRate := func(_ context.Context, _ lntypes.Hash) (chainfee.SatPerKWeight, error) { - return chainfee.SatPerKWeight(10_000), nil + return feeRate, nil } presignedHelper := newMockPresignedHelper() @@ -1388,12 +1393,17 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, checkBatcherError(t, err) }() + swapHashes := make([]lntypes.Hash, numSwaps) + groups := make([][]Input, numSwaps) txs := make([]*wire.MsgTx, numSwaps) allOps := make([]wire.OutPoint, 0, numSwaps*sweepsPerSwap) + spendChans := make([]<-chan *SpendDetail, numSwaps) + confChans := make([]<-chan *ConfDetail, numSwaps) for i := range numSwaps { // Create a swap of sweepsPerSwap sweeps. swapHash := lntypes.Hash{byte(i + 1)} + swapHashes[i] = swapHash ops := make([]wire.OutPoint, sweepsPerSwap) group := make([]Input, sweepsPerSwap) for j := range sweepsPerSwap { @@ -1405,15 +1415,16 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, group[j] = Input{ Outpoint: ops[j], - Value: btcutil.Amount(1_000_000 * (j + 1)), + Value: sweepAmounts[j], } } + groups[i] = group // Create a swap in DB. swap := &loopdb.LoopOutContract{ SwapContract: loopdb.SwapContract{ CltvExpiry: 111, - AmountRequested: 3_000_000, + AmountRequested: swapAmount, ProtocolVersion: loopdb.ProtocolVersionMuSig2, HtlcKeys: htlcKeys, @@ -1440,11 +1451,24 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, ) require.NoError(t, err) + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + spendChans[i] = spendChan + confChan := make(chan *ConfDetail, 1) + confChans[i] = confChan + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + // Add the sweep, triggering the publish attempt. require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ SwapHash: swapHash, Inputs: group, - Notifier: &dummyNotifier, + Notifier: notifier, })) // For the first group it should register for the sweep's spend @@ -1543,6 +1567,13 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, SpendingHeight: int32(601 + numSwaps + 1), } lnd.SpendChannel <- spendDetail + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + spend := <-spendChans[i] + require.Equal(t, txHash, spend.Tx.TxHash()) + } + <-lnd.RegisterConfChannel require.NoError(t, lnd.NotifyHeight( int32(601+numSwaps+1+batchConfHeight), @@ -1554,12 +1585,18 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, // CleanupTransactions is called here. <-presignedHelper.cleanupCalled - // If all the swaps were confirmed, stop. - if numConfirmedSwaps == numSwaps { - return + // Increasing block height caused the second batch to re-publish. + if online && numConfirmedSwaps < numSwaps { + <-lnd.TxPublishChannel + } + + // Make sure that notifiers of confirmed sweeps received notifications. + for i := range numConfirmedSwaps { + conf := <-confChans[i] + require.Equal(t, txHash, conf.Tx.TxHash()) } - if !online { + if !online && numConfirmedSwaps != numSwaps { // If the sweeps are offline, the missing sweeps in the // confirmed transaction should be re-added to the batcher as // new batch. The groups are added incrementally, so we need @@ -1568,6 +1605,47 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, <-lnd.TxPublishChannel } + // Now make sure that a correct spend and conf contification is sent if + // AddSweep is called after confirming the sweeps. + for i := range numConfirmedSwaps { + // Create a spending notification channel. + spendChan := make(chan *SpendDetail, 1) + confChan := make(chan *ConfDetail) + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: make(chan error, 1), + ConfChan: confChan, + ConfErrChan: make(chan error, 1), + QuitChan: make(chan bool, 1), + } + + // Add the sweep, triggering the publish attempt. + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHashes[i], + Inputs: groups[i], + Notifier: notifier, + })) + + spendReg := <-lnd.RegisterSpendChannel + spendReg.SpendChannel <- spendDetail + + spend := <-spendChan + require.Equal(t, txHash, spend.Tx.TxHash()) + + <-lnd.RegisterConfChannel + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + Tx: tx, + } + + conf := <-confChan + require.Equal(t, tx.TxHash(), conf.Tx.TxHash()) + } + + // If all the swaps were confirmed, stop. + if numConfirmedSwaps == numSwaps { + return + } + // Wait to new batch to appear and to have the expected size. wantSize := (numSwaps - numConfirmedSwaps) * sweepsPerSwap if online { @@ -1675,11 +1753,13 @@ func TestPresigned(t *testing.T) { testPurging(3, 1, false) testPurging(3, 2, false) testPurging(5, 2, false) + testPurging(5, 3, false) // Test cases in which the sweeps are online. testPurging(2, 1, true) testPurging(3, 1, true) testPurging(3, 2, true) testPurging(5, 2, true) + testPurging(5, 3, true) }) } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 3e398fe12..93cba3752 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -2457,22 +2457,15 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, return b.state == Closed }, test.Timeout, eventuallyCheckFrequency) - // Since second batch was created we check that it registered for its - // primary sweep's spend. - <-lnd.RegisterSpendChannel - - // While handling the spend notification the batch should detect that - // some sweeps did not appear in the spending tx, therefore it redirects - // them back to the batcher and the batcher inserts them in a new batch. - require.Eventually(t, func() bool { - return batcher.numBatches(ctx) == 2 - }, test.Timeout, eventuallyCheckFrequency) - // We mock the confirmation notification. lnd.ConfChannel <- &chainntnfs.TxConfirmation{ Tx: spendingTx, } + // Since second batch was created we check that it registered for its + // primary sweep's spend. + <-lnd.RegisterSpendChannel + // Wait for tx to be published. // Here is a race condition, which is unlikely to cause a crash: if we // wait for publish tx before sending a conf notification (previous From b961e18fa06f9a5785a00cbaa2f5fe9a8e2aa412 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Sun, 27 Apr 2025 01:08:41 -0300 Subject: [PATCH 03/15] sweepbatcher: fix OnChainFeePortion values There were two mistakes. In case of a swap with multiple sweeps only the fee of the first sweep of a swap was accounted. Rounding diff (the remainder) was attributed to all the sweeps rather than to the first (primary) sweep of the batch. The sweep to attribute the remainder was chosen by comparing SignatureScript which is always empty. New approach is to find the primary sweep and to compare its outpoint directly. --- sweepbatcher/sweep_batch.go | 71 +++++++++++++++----- sweepbatcher/sweep_batcher.go | 34 +++++++--- sweepbatcher/sweep_batcher_presigned_test.go | 24 +++++++ 3 files changed, 105 insertions(+), 24 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 70f9a30df..049b2cf5a 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1927,12 +1927,12 @@ func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, } // getFeePortionPaidBySweep returns the fee portion that the sweep should pay -// for the batch transaction. If the sweep is the first sweep in the batch, it +// for the batch transaction. If the sweep is the primary sweep in the batch, it // pays the rounding difference. -func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, - roundingDiff btcutil.Amount, sweep *sweep) btcutil.Amount { +func getFeePortionPaidBySweep(feePortionPerSweep, roundingDiff btcutil.Amount, + primary bool) btcutil.Amount { - if bytes.Equal(spendTx.TxIn[0].SignatureScript, sweep.htlc.SigScript) { + if primary { return feePortionPerSweep + roundingDiff } @@ -1983,22 +1983,42 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { spendTx, len(notifyList), totalSweptAmt, ) + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range notifyList { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + + // Now send notifications to notifiers. for _, sweep := range notifyList { // If the sweep's notifier is empty then this means that a swap - // is not waiting to read an update from it, so we can skip - // the notification part. + // is not waiting to read an update from it or this is not the + // first sweep in a swap, so we can skip the notification part. if sweep.notifier == nil || *sweep.notifier == (SpendNotifier{}) { continue } + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[sweep.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + sweep.swapHash) + } + delete(swap2fee, sweep.swapHash) + spendDetail := SpendDetail{ - Tx: spendTx, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &sweep, - ), + Tx: spendTx, + OnChainFeePortion: fee, } // Dispatch the sweep notifier, we don't care about the outcome @@ -2193,6 +2213,18 @@ func (b *batch) handleConf(ctx context.Context, spendTx, len(b.sweeps), totalSweptAmt, ) + // Calculate fees per swaps. Only the first sweep in a swap has a + // notifier, so we calculate total fee per swap and send it to a sweep + // having that swap and a notifier. + swap2fee := make(map[lntypes.Hash]btcutil.Amount) + for _, sweep := range b.sweeps { + primary := sweep.outpoint == b.primarySweepID + + swap2fee[sweep.swapHash] += getFeePortionPaidBySweep( + feePortionPaidPerSweep, roundingDifference, primary, + ) + } + // Send the confirmation to all the notifiers. for _, s := range b.sweeps { // If the sweep's notifier is empty then this means that @@ -2202,12 +2234,19 @@ func (b *batch) handleConf(ctx context.Context, continue } + // Make sure there is only one sweep with a notifier per swap + // hash, otherwise our fee calculation is incorrect. + fee, has := swap2fee[s.swapHash] + if !has { + return fmt.Errorf("no fee for swap %v; maybe "+ + "multiple sweeps with a notifier per swap?", + s.swapHash) + } + delete(swap2fee, s.swapHash) + confDetail := &ConfDetail{ - TxConfirmation: conf, - OnChainFeePortion: getFeePortionPaidBySweep( - spendTx, feePortionPaidPerSweep, - roundingDifference, &s, - ), + TxConfirmation: conf, + OnChainFeePortion: fee, } // Notify the caller in a goroutine to avoid possible dead-lock. diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 96d88dad2..fa457e02a 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -886,7 +886,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // Instead we directly detect and return the spend here. if completed && parentBatch.Confirmed { return b.monitorSpendAndNotify( - ctx, sweep, parentBatch.ID, notifier, + ctx, sweeps, parentBatch.ID, notifier, ) } @@ -1186,7 +1186,7 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch, // the response back to the response channel. It is called if the batch is fully // confirmed and we just need to deliver the data back to the caller though // SpendNotifier. -func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, +func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweeps []*sweep, parentBatchID int32, notifier *SpendNotifier) error { // If the caller has not provided a notifier, stop. @@ -1204,6 +1204,17 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, return err } + // Find the primarySweepID. + dbSweeps, err := b.store.FetchBatchSweeps(ctx, parentBatchID) + if err != nil { + cancel() + + return err + } + primarySweepID := dbSweeps[0].Outpoint + + sweep := sweeps[0] + spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( spendCtx, &sweep.outpoint, sweep.htlc.PkScript, sweep.initiationHeight, @@ -1224,6 +1235,7 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, select { case spend := <-spendChan: spendTx := spend.SpendingTx + // Calculate the fee portion that each sweep should pay // for the batch. feePortionPerSweep, roundingDifference := @@ -1232,17 +1244,23 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, totalSwept, ) - onChainFeePortion := getFeePortionPaidBySweep( - spendTx, feePortionPerSweep, - roundingDifference, sweep, - ) + // Sum onchain fee across all the sweeps of the swap. + var fee btcutil.Amount + for _, s := range sweeps { + isFirst := s.outpoint == primarySweepID + + fee += getFeePortionPaidBySweep( + feePortionPerSweep, roundingDifference, + isFirst, + ) + } // Notify the requester of the spend with the spend // details, including the fee portion for this // particular sweep. spendDetail := &SpendDetail{ Tx: spendTx, - OnChainFeePortion: onChainFeePortion, + OnChainFeePortion: fee, } select { @@ -1250,7 +1268,7 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep, case notifier.SpendChan <- spendDetail: err := b.monitorConfAndNotify( ctx, sweep, notifier, spendTx, - onChainFeePortion, + fee, ) if err != nil { b.writeToErrChan( diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 8a7fc856f..eee97457b 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1568,10 +1568,31 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } lnd.SpendChannel <- spendDetail + // Calculate the expected on-chain fee of the swap. + wantFee := make([]btcutil.Amount, numConfirmedSwaps) + for i := range numConfirmedSwaps { + batchAmount := swapAmount * btcutil.Amount(numConfirmedSwaps) + txFee := batchAmount - btcutil.Amount(tx.TxOut[0].Value) + numConfirmedSweeps := numConfirmedSwaps * sweepsPerSwap + feePerSweep := txFee / btcutil.Amount(numConfirmedSweeps) + roundingDiff := txFee - feePerSweep*btcutil.Amount( + numConfirmedSweeps, + ) + swapFee := feePerSweep * 2 + + // Add rounding difference to the first swap. + if i == 0 { + swapFee += roundingDiff + } + + wantFee[i] = swapFee + } + // Make sure that notifiers of confirmed sweeps received notifications. for i := range numConfirmedSwaps { spend := <-spendChans[i] require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) } <-lnd.RegisterConfChannel @@ -1594,6 +1615,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, for i := range numConfirmedSwaps { conf := <-confChans[i] require.Equal(t, txHash, conf.Tx.TxHash()) + require.Equal(t, wantFee[i], conf.OnChainFeePortion) } if !online && numConfirmedSwaps != numSwaps { @@ -1631,6 +1653,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, spend := <-spendChan require.Equal(t, txHash, spend.Tx.TxHash()) + require.Equal(t, wantFee[i], spend.OnChainFeePortion) <-lnd.RegisterConfChannel lnd.ConfChannel <- &chainntnfs.TxConfirmation{ @@ -1639,6 +1662,7 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, conf := <-confChan require.Equal(t, tx.TxHash(), conf.Tx.TxHash()) + require.Equal(t, wantFee[i], conf.OnChainFeePortion) } // If all the swaps were confirmed, stop. From 7ebe2e5b309ce272f4053a9615f4a71ce35fde9a Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 30 Apr 2025 00:07:04 -0300 Subject: [PATCH 04/15] loopout: close sweepbatcher quitChan This is needed because sweepbatcher can use this channel in multiple select statements to unblock itself if the caller cancels. --- loopout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loopout.go b/loopout.go index f7da0de67..6fc740916 100644 --- a/loopout.go +++ b/loopout.go @@ -1147,7 +1147,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, quitChan := make(chan bool, 1) defer func() { - quitChan <- true + close(quitChan) }() notifier := sweepbatcher.SpendNotifier{ From 8c784ef7613954285a00e3a8dea0cf7a56c251ff Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 5 May 2025 23:28:23 -0300 Subject: [PATCH 05/15] sweepbatcher: pass utxo to fee provider --- loopout_feerate.go | 4 ++- sweepbatcher/sweep_batcher.go | 6 ++-- sweepbatcher/sweep_batcher_presigned_test.go | 32 ++++++++++---------- sweepbatcher/sweep_batcher_test.go | 12 ++++---- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/loopout_feerate.go b/loopout_feerate.go index 4cebd1e32..4540644c1 100644 --- a/loopout_feerate.go +++ b/loopout_feerate.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/utils" @@ -71,7 +72,8 @@ func newLoopOutSweepFeerateProvider(sweeper sweeper, // GetMinFeeRate returns minimum required feerate for a sweep by swap hash. func (p *loopOutSweepFeerateProvider) GetMinFeeRate(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { _, feeRate, err := p.GetConfTargetAndFeeRate(ctx, swapHash) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index fa457e02a..e8d482aa1 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -203,8 +203,8 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error // FeeRateProvider is a function that returns min fee rate of a batch sweeping // the UTXO of the swap. -type FeeRateProvider func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) +type FeeRateProvider func(ctx context.Context, swapHash lntypes.Hash, + utxo wire.OutPoint) (chainfee.SatPerKWeight, error) // InitialDelayProvider returns the duration after which a newly created batch // is first published. It allows to customize the duration based on total value @@ -1519,7 +1519,7 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash) + minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) if err != nil { return nil, fmt.Errorf("failed to fetch min fee rate "+ "for %x: %w", swapHash[:6], err) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index eee97457b..d03581f32 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -254,8 +254,8 @@ func testPresigned_forgotten_presign(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -330,8 +330,8 @@ func testPresigned_input1_offline_then_input2(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -511,8 +511,8 @@ func testPresigned_two_inputs_one_goes_offline(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -647,8 +647,8 @@ func testPresigned_first_publish_fails(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -770,8 +770,8 @@ func testPresigned_locktime(t *testing.T, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -854,8 +854,8 @@ func testPresigned_presigned_group(t *testing.T, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return chainfee.SatPerKWeight(10_000), nil } @@ -1091,8 +1091,8 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, setFeeRate := func(feeRate chainfee.SatPerKWeight) { currentFeeRate = feeRate } - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return currentFeeRate, nil } @@ -1372,8 +1372,8 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - customFeeRate := func(_ context.Context, - _ lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { return feeRate, nil } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 93cba3752..590d83ae4 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -408,8 +408,8 @@ func testFeeBumping(t *testing.T, store testStore, // Disable fee bumping, if requested. var opts []BatcherOption if noFeeBumping { - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return test.DefaultMockFee, nil @@ -3844,8 +3844,8 @@ func testSweepFetcher(t *testing.T, store testStore, require.NoError(t, err) store.AssertLoopOutStored() - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { // Always provide the same value, no bumping. return feeRate, nil @@ -4691,8 +4691,8 @@ func testFeeRateGrows(t *testing.T, store testStore, swap2feeRate[swapHash] = rate } - customFeeRate := func(ctx context.Context, - swapHash lntypes.Hash) (chainfee.SatPerKWeight, error) { + customFeeRate := func(_ context.Context, swapHash lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { swap2feeRateMu.Lock() defer swap2feeRateMu.Unlock() From 8a071078119a6cc026294b5c3ac3abdde5f66ba2 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:21:39 -0300 Subject: [PATCH 06/15] sweepbatcher: make sure dest pkscript is filled --- sweepbatcher/sweep_batch.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 049b2cf5a..f559d9f84 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -1285,6 +1285,10 @@ func constructUnsignedTx(sweeps []sweep, address btcutil.Address, return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript "+ "failed: %w", err) } + if len(batchPkScript) == 0 { + return nil, 0, 0, 0, fmt.Errorf("txscript.PayToAddrScript " + + "returned an empty pkScript") + } // Add the output to weight estimates. err = sweeppkg.AddOutputEstimate(&weightEstimate, address) From d75f17df470fccc8cc748a4c0e221333a3a67b8b Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:25:21 -0300 Subject: [PATCH 07/15] sweepbatcher: simplify presigned/purging test It doesn't need loopdb, so remove that code. --- sweepbatcher/sweep_batcher_presigned_test.go | 58 +++----------------- 1 file changed, 7 insertions(+), 51 deletions(-) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index d03581f32..e4f0a148d 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -1354,7 +1354,7 @@ func testPresigned_presigned_and_regular_sweeps(t *testing.T, store testStore, // to another online batch. In offline case they must are added to a new batch // having valid presigned transactions. func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, - store testStore, batcherStore testBatcherStore, online bool) { + batcherStore testBatcherStore, online bool) { defer test.Guard(t)() @@ -1420,33 +1420,13 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, } groups[i] = group - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: swapAmount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{byte(i + 1)}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable all the sweeps. for _, op := range ops { presignedHelper.SetOutpointOnline(op, true) } // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1506,31 +1486,11 @@ func testPresigned_purging(t *testing.T, numSwaps, numConfirmedSwaps int, }, } - // Create a swap in DB. - swap := &loopdb.LoopOutContract{ - SwapContract: loopdb.SwapContract{ - CltvExpiry: 111, - AmountRequested: amount, - ProtocolVersion: loopdb.ProtocolVersionMuSig2, - HtlcKeys: htlcKeys, - - // Make preimage unique to pass SQL constraints. - Preimage: lntypes.Preimage{1, 2, 3}, - }, - - DestAddr: destAddr, - SwapInvoice: swapInvoice, - SweepConfTarget: 111, - } - err := store.CreateLoopOut(ctx, swapHash, swap) - require.NoError(t, err) - store.AssertLoopOutStored() - // Enable the sweep. presignedHelper.SetOutpointOnline(opx, true) // An attempt to presign must succeed. - err = batcher.PresignSweepsGroup( + err := batcher.PresignSweepsGroup( ctx, group, sweepTimeout, destAddr, ) require.NoError(t, err) @@ -1759,14 +1719,10 @@ func TestPresigned(t *testing.T) { } t.Run(name, func(t *testing.T) { - runTests(t, func(t *testing.T, store testStore, - batcherStore testBatcherStore) { - - testPresigned_purging( - t, numSwaps, numConfirmedSwaps, - store, batcherStore, online, - ) - }) + testPresigned_purging( + t, numSwaps, numConfirmedSwaps, + NewStoreMock(), online, + ) }) } From 7735bdfe2717fa2827d459dc755a3ba8452552da Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 6 May 2025 14:26:06 -0300 Subject: [PATCH 08/15] sweepbatcher: make sure HTLC.PkScript is filled --- sweepbatcher/sweep_batcher.go | 7 +++++++ sweepbatcher/sweep_batcher_presigned_test.go | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index e8d482aa1..921286db4 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1515,6 +1515,13 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, swapHash[:6], err) } + // Make sure that PkScript of the coin is filled. Otherwise + // RegisterSpendNtfn fails. + if len(s.HTLC.PkScript) == 0 { + return nil, fmt.Errorf("sweep data for %x doesn't have "+ + "HTLC.PkScript set", swapHash[:6]) + } + // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. var minFeeRate chainfee.SatPerKWeight diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index e4f0a148d..33036afee 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -14,6 +14,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lntypes" @@ -232,13 +233,18 @@ func (h *mockPresignedHelper) FetchSweep(_ context.Context, h.mu.Lock() defer h.mu.Unlock() - _, has := h.onlineOutpoints[utxo] + // Find IsPresigned. + _, isPresigned := h.onlineOutpoints[utxo] return &SweepInfo{ // Set Timeout to prevent warning messages about timeout=0. Timeout: sweepTimeout, - IsPresigned: has, + IsPresigned: isPresigned, + + HTLC: swap.Htlc{ + PkScript: []byte{10, 11, 12}, + }, }, nil } From f19f9f5288eba23bfc867752283b36954cccee94 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 8 May 2025 00:12:30 -0300 Subject: [PATCH 09/15] sweepbatcher/presigned: minRelayFee edge cases Make sure that broadcasted tx has feeRate >= minRelayFee. Make sure that feeRate of broadcasted tx doesn't decrease. --- sweepbatcher/presigned.go | 19 +-- sweepbatcher/sweep_batcher_presigned_test.go | 121 +++++++++++++++++++ 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 7ca325938..730325117 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -407,9 +407,16 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, } } + // Determine the current minimum relay fee based on our chain backend. + minRelayFee, err := b.wallet.MinRelayFee(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get minRelayFee: %w", err), + false + } + // Cache current height and desired feerate of the batch. currentHeight := b.currentHeight - feeRate := b.rbfCache.FeeRate + feeRate := max(b.rbfCache.FeeRate, minRelayFee) // Append this sweep to an array of sweeps. This is needed to keep the // order of sweeps stored, as iterating the sweeps map does not @@ -447,13 +454,6 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, batchAmt += sweep.value } - // Determine the current minimum relay fee based on our chain backend. - minRelayFee, err := b.wallet.MinRelayFee(ctx) - if err != nil { - return 0, fmt.Errorf("failed to get minRelayFee: %w", err), - false - } - // Get a pre-signed transaction. const loadOnly = false signedTx, err := b.cfg.presignedHelper.SignTx( @@ -508,6 +508,9 @@ func (b *batch) publishPresigned(ctx context.Context) (btcutil.Amount, error, b.batchTxid = &txHash b.batchPkScript = tx.TxOut[0].PkScript + // Update cached FeeRate not to broadcast a tx with lower feeRate. + b.rbfCache.FeeRate = max(b.rbfCache.FeeRate, signedFeeRate) + return fee, nil, true } diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 33036afee..73aee2199 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -159,6 +159,11 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, h.mu.Lock() defer h.mu.Unlock() + if feeRate < minRelayFee { + return nil, fmt.Errorf("feeRate (%v) is below minRelayFee (%v)", + feeRate, minRelayFee) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -492,6 +497,118 @@ func testPresigned_input1_offline_then_input2(t *testing.T, require.NoError(t, err) } +// testPresigned_min_relay_fee tests that online and presigned transactions +// comply with min_relay_fee. +func testPresigned_min_relay_fee(t *testing.T, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const inputAmt = 1_000_000 + + customFeeRate := func(_ context.Context, _ lntypes.Hash, + _ wire.OutPoint) (chainfee.SatPerKWeight, error) { + + return chainfee.FeePerKwFloor, nil + } + + presignedHelper := newMockPresignedHelper() + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, presignedHelper, + WithCustomFeeRate(customFeeRate), + WithPresignedHelper(presignedHelper)) + go func() { + err := batcher.Run(ctx) + checkBatcherError(t, err) + }() + + // Set high min_relay_fee. + lnd.SetMinRelayFee(400) + + // Create the first sweep. + swapHash1 := lntypes.Hash{1, 1, 1} + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + sweepReq1 := SweepRequest{ + SwapHash: swapHash1, + Inputs: []Input{{ + Value: inputAmt, + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + } + + // Enable the input and presign. + presignedHelper.SetOutpointOnline(op1, true) + err := batcher.PresignSweepsGroup( + ctx, []Input{{Outpoint: op1, Value: inputAmt}}, + sweepTimeout, destAddr, + ) + require.NoError(t, err) + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) + + // Since a batch was created we check that it registered for its primary + // sweep's spend. + <-lnd.RegisterSpendChannel + + // Wait for a transactions to be published. + tx := <-lnd.TxPublishChannel + gotFeeRate := presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // The only difference of tx2 is a higher lock_time. + lnd.SetMinRelayFee(300) + require.NoError(t, lnd.NotifyHeight(601)) + tx2 := <-lnd.TxPublishChannel + require.Equal(t, tx.TxOut[0].Value, tx2.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(402), gotFeeRate) + require.Equal(t, uint32(601), tx2.LockTime) + + // Set a higher min_relay_fee, turn off the client and try presigned tx. + lnd.SetMinRelayFee(500) + presignedHelper.SetOutpointOnline(op1, false) + + // Check fee rate of the presigned tx broadcasted. + require.NoError(t, lnd.NotifyHeight(602)) + tx = <-lnd.TxPublishChannel + gotFeeRate = presignedHelper.getTxFeerate(tx, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx.LockTime) + + // Now decrease min_relay_fee and make sure fee rate doesn't decrease. + // It should re-broadcast the same presigned tx. + lnd.SetMinRelayFee(450) + require.NoError(t, lnd.NotifyHeight(603)) + tx2 = <-lnd.TxPublishChannel + require.Equal(t, tx.TxHash(), tx2.TxHash()) + gotFeeRate = presignedHelper.getTxFeerate(tx2, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + // LockTime of a presigned tx is 0. + require.Equal(t, uint32(0), tx2.LockTime) + + // Even if the client is back online, fee rate doesn't decrease. + presignedHelper.SetOutpointOnline(op1, true) + require.NoError(t, lnd.NotifyHeight(604)) + tx3 := <-lnd.TxPublishChannel + require.Equal(t, tx2.TxOut[0].Value, tx3.TxOut[0].Value) + gotFeeRate = presignedHelper.getTxFeerate(tx3, inputAmt) + require.Equal(t, chainfee.SatPerKWeight(523), gotFeeRate) + require.Equal(t, uint32(604), tx3.LockTime) +} + // testPresigned_two_inputs_one_goes_offline tests presigned mode for the // following scenario: two online inputs are added, then one of them goes // offline, then feerate grows and a presigned transaction is used. @@ -1692,6 +1809,10 @@ func TestPresigned(t *testing.T) { testPresigned_input1_offline_then_input2(t, NewStoreMock()) }) + t.Run("min_relay_fee", func(t *testing.T) { + testPresigned_min_relay_fee(t, NewStoreMock()) + }) + t.Run("two_inputs_one_goes_offline", func(t *testing.T) { testPresigned_two_inputs_one_goes_offline(t, NewStoreMock()) }) From e7d5172a7983f0d84743f5fc2a0082255560bd6a Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 21:40:42 -0300 Subject: [PATCH 10/15] sweepbatcher: remove method Presign Method Presign is not as reliable as SignTx, because it checks transaction by txid and can miss for example if LockTime is different. SignTx can do everything Presign was used for. --- sweepbatcher/presigned.go | 28 ++++++------ sweepbatcher/presigned_test.go | 20 ++++++--- sweepbatcher/sweep_batch.go | 2 +- sweepbatcher/sweep_batcher.go | 11 +---- sweepbatcher/sweep_batcher_presigned_test.go | 46 ++++---------------- 5 files changed, 39 insertions(+), 68 deletions(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 730325117..3dcb926b8 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -36,13 +36,7 @@ func (b *batch) ensurePresigned(ctx context.Context, newSweeps []*sweep, // presignedTxChecker has methods to check if the inputs are presigned. type presignedTxChecker interface { destPkScripter - - // SignTx signs an unsigned transaction or returns a pre-signed tx. - // It is only called with loadOnly=true by ensurePresigned. - SignTx(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount, - minRelayFee, feeRate chainfee.SatPerKWeight, - loadOnly bool) (*wire.MsgTx, error) + presigner } // ensurePresigned checks that there is a presigned transaction spending the @@ -289,11 +283,12 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // presigner tries to presign a batch transaction. type presigner interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error + // SignTx signs an unsigned transaction or returns a pre-signed tx. + // It is only called with loadOnly=true by ensurePresigned. + SignTx(ctx context.Context, primarySweepID wire.OutPoint, + tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) } // presign tries to presign batch sweep transactions of the sweeps. It signs @@ -372,7 +367,14 @@ func presign(ctx context.Context, presigner presigner, destAddr btcutil.Address, } // Try to presign this transaction. - err = presigner.Presign(ctx, primarySweepID, tx, batchAmt) + const ( + loadOnly = false + minRelayFee = chainfee.AbsoluteFeePerKwFloor + ) + _, err = presigner.SignTx( + ctx, primarySweepID, tx, batchAmt, minRelayFee, fr, + loadOnly, + ) if err != nil { return fmt.Errorf("failed to presign unsigned tx %v "+ "for feeRate %v: %w", tx.TxHash(), fr, err) diff --git a/sweepbatcher/presigned_test.go b/sweepbatcher/presigned_test.go index 915038889..629ff1de4 100644 --- a/sweepbatcher/presigned_test.go +++ b/sweepbatcher/presigned_test.go @@ -612,24 +612,30 @@ type mockPresigner struct { failAt int } -// Presign memorizes the value of the output and fails if the number of +// SignTx memorizes the value of the output and fails if the number of // calls previously made is failAt. -func (p *mockPresigner) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { +func (p *mockPresigner) SignTx(ctx context.Context, + primarySweepID wire.OutPoint, tx *wire.MsgTx, inputAmt btcutil.Amount, + minRelayFee, feeRate chainfee.SatPerKWeight, + loadOnly bool) (*wire.MsgTx, error) { + + if ctx.Err() != nil { + return nil, ctx.Err() + } if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) } if len(p.outputs)+1 == p.failAt { - return fmt.Errorf("test error in Presign") + return nil, fmt.Errorf("test error in SignTx") } p.outputs = append(p.outputs, btcutil.Amount(tx.TxOut[0].Value)) p.lockTimes = append(p.lockTimes, tx.LockTime) - return nil + return tx, nil } // TestPresign checks that function presign presigns correct set of transactions diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index f559d9f84..08ed82990 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -485,7 +485,7 @@ func (b *batch) Errorf(format string, params ...interface{}) { // checkSweepToAdd checks if a sweep can be added or updated in the batch. The // caller must lock the event loop using scheduleNextCall. The function returns // if the sweep already exists in the batch. If presigned mode is enabled, the -// result depends on the outcome of the method presignedHelper.Presign for a +// result depends on the outcome of the method presignedHelper.SignTx for a // non-empty batch. For an empty batch, the input needs to pass // PresignSweepsGroup. func (b *batch) checkSweepToAdd(_ context.Context, sweep *sweep) (bool, error) { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 921286db4..16ee335f3 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -159,18 +159,11 @@ type SignMuSig2 func(ctx context.Context, muSig2Version input.MuSig2Version, // fails (e.g. because one of the inputs is offline), an input can't be added to // a batch. type PresignedHelper interface { - // Presign tries to presign a batch transaction. If the method returns - // nil, it is guaranteed that future calls to SignTx on this set of - // sweeps return valid signed transactions. The implementation should - // first check if this transaction already exists in the store to skip - // cosigning if possible. - Presign(ctx context.Context, primarySweepID wire.OutPoint, - tx *wire.MsgTx, inputAmt btcutil.Amount) error - // DestPkScript returns destination pkScript used by the sweep batch // with the primary outpoint specified. Returns an error, if such tx // doesn't exist. If there are many such transactions, returns any of // pkScript's; all of them should have the same destination pkScript. + // TODO: embed this data into SweepInfo. DestPkScript(ctx context.Context, primarySweepID wire.OutPoint) ([]byte, error) @@ -951,7 +944,7 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, // spinUpNewBatch creates new batch, starts it and adds the sweeps to it. If // presigned mode is enabled, the result also depends on outcome of -// presignedHelper.Presign. +// presignedHelper.SignTx. func (b *Batcher) spinUpNewBatch(ctx context.Context, sweeps []*sweep) error { // Spin up a fresh batch. newBatch, err := b.spinUpBatch(ctx) diff --git a/sweepbatcher/sweep_batcher_presigned_test.go b/sweepbatcher/sweep_batcher_presigned_test.go index 73aee2199..36f11f750 100644 --- a/sweepbatcher/sweep_batcher_presigned_test.go +++ b/sweepbatcher/sweep_batcher_presigned_test.go @@ -96,42 +96,6 @@ func (h *mockPresignedHelper) getTxFeerate(tx *wire.MsgTx, return chainfee.NewSatPerKWeight(fee, weight) } -// Presign tries to presign the transaction. It succeeds if all the inputs -// are online. In case of success it adds the transaction to presignedBatches. -func (h *mockPresignedHelper) Presign(ctx context.Context, - primarySweepID wire.OutPoint, tx *wire.MsgTx, - inputAmt btcutil.Amount) error { - - h.mu.Lock() - defer h.mu.Unlock() - - // Check if such a transaction already exists. This is not only an - // optimization, but also enables re-adding multiple groups if sweeps - // are offline. - wantTxHash := tx.TxHash() - for _, candidate := range h.presignedBatches[primarySweepID] { - if candidate.TxHash() == wantTxHash { - return nil - } - } - - if !hasInput(tx, primarySweepID) { - return fmt.Errorf("primarySweepID %v not in tx", primarySweepID) - } - - if offline := h.offlineInputs(tx); len(offline) != 0 { - return fmt.Errorf("some inputs of tx are offline: %v", offline) - } - - tx = tx.Copy() - h.sign(tx) - h.presignedBatches[primarySweepID] = append( - h.presignedBatches[primarySweepID], tx, - ) - - return nil -} - // DestPkScript returns destination pkScript used in presigned tx sweeping // these inputs. func (h *mockPresignedHelper) DestPkScript(ctx context.Context, @@ -164,6 +128,11 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, feeRate, minRelayFee) } + if !hasInput(tx, primarySweepID) { + return nil, fmt.Errorf("primarySweepID %v not in tx", + primarySweepID) + } + // If all the inputs are online and loadOnly is not set, sign this exact // transaction. if offline := h.offlineInputs(tx); len(offline) == 0 && !loadOnly { @@ -205,7 +174,8 @@ func (h *mockPresignedHelper) SignTx(ctx context.Context, } if bestTx == nil { - return nil, fmt.Errorf("no such presigned tx found") + return nil, fmt.Errorf("some outpoint is offline and no " + + "suitable presigned tx found") } return bestTx.Copy(), nil @@ -1025,7 +995,7 @@ func testPresigned_presigned_group(t *testing.T, // An attempt to presign must fail. err = batcher.PresignSweepsGroup(ctx, group1, sweepTimeout, destAddr) - require.ErrorContains(t, err, "some inputs of tx are offline") + require.ErrorContains(t, err, "some outpoint is offline") // Enable both outpoints. presignedHelper.SetOutpointOnline(op2, true) From 59a3af31cc32bb140dc9adcb7b82dd610c64bd25 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 23:31:07 -0300 Subject: [PATCH 11/15] sweepbatcher: format pkscript as hex --- sweepbatcher/presigned.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 3dcb926b8..e813cd69f 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -604,7 +604,7 @@ func CheckSignedTx(unsignedTx, signedTx *wire.MsgTx, inputAmt btcutil.Amount, unsignedOut := unsignedTx.TxOut[0] signedOut := signedTx.TxOut[0] if !bytes.Equal(unsignedOut.PkScript, signedOut.PkScript) { - return fmt.Errorf("mismatch of output pkScript: %v, %v", + return fmt.Errorf("mismatch of output pkScript: %x, %x", unsignedOut.PkScript, signedOut.PkScript) } From 64a24177aeaeedc3d154db28c725d1d1e35d8eea Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 14 May 2025 23:36:15 -0300 Subject: [PATCH 12/15] sweepbatcher: more logging in PresignSweepsGroup --- sweepbatcher/sweep_batcher.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 16ee335f3..7fbbcda49 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/btcsuite/btcwallet/chain" @@ -729,7 +730,14 @@ func (b *Batcher) PresignSweepsGroup(ctx context.Context, inputs []Input, if err != nil { return fmt.Errorf("failed to get nextBlockFeeRate: %w", err) } - infof("PresignSweepsGroup: nextBlockFeeRate is %v", nextBlockFeeRate) + destPkscript, err := txscript.PayToAddrScript(destAddress) + if err != nil { + return fmt.Errorf("txscript.PayToAddrScript failed: %w", err) + } + infof("PresignSweepsGroup: nextBlockFeeRate is %v, inputs: %v, "+ + "destAddress: %v, destPkscript: %x sweepTimeout: %d", + nextBlockFeeRate, inputs, destAddress, destPkscript, + sweepTimeout) sweeps := make([]sweep, len(inputs)) for i, input := range inputs { From 72caafa14e8f2dfdc9afb3aaea1b9dd54d890f92 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 15 May 2025 00:37:13 -0300 Subject: [PATCH 13/15] sweepbatcher: fix a bug in dest address selection For presigned possible remaining groups, the destination address of the current batch was used instead of the destination address of an expected future batch. TODO: reproduce in unit test "purged". For this, each swap should have a separate destination address. --- sweepbatcher/presigned.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index e813cd69f..385f34cf9 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -247,7 +247,7 @@ func (b *batch) presign(ctx context.Context, newSweeps []*sweep) error { // Cache the destination address. destAddr, err := getPresignedSweepsDestAddr( - ctx, b.cfg.presignedHelper, b.primarySweepID, + ctx, b.cfg.presignedHelper, primarySweepID, b.cfg.chainParams, ) if err != nil { From 1794aa21e5d08f562eaeeb5ec4e85571dc7280e9 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Thu, 12 Jun 2025 13:29:48 -0300 Subject: [PATCH 14/15] loopdb: remove unused method ConfirmBatch --- loopdb/sqlc/batch.sql.go | 14 -------------- loopdb/sqlc/querier.go | 1 - loopdb/sqlc/queries/batch.sql | 8 -------- sweepbatcher/store.go | 8 -------- sweepbatcher/store_mock.go | 16 ---------------- sweepbatcher/sweep_batcher.go | 3 --- 6 files changed, 50 deletions(-) diff --git a/loopdb/sqlc/batch.sql.go b/loopdb/sqlc/batch.sql.go index 875473fe7..ce127f90b 100644 --- a/loopdb/sqlc/batch.sql.go +++ b/loopdb/sqlc/batch.sql.go @@ -21,20 +21,6 @@ func (q *Queries) CancelBatch(ctx context.Context, id int32) error { return err } -const confirmBatch = `-- name: ConfirmBatch :exec -UPDATE - sweep_batches -SET - confirmed = TRUE -WHERE - id = $1 -` - -func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error { - _, err := q.db.ExecContext(ctx, confirmBatch, id) - return err -} - const getBatchSweeps = `-- name: GetBatchSweeps :many SELECT id, swap_hash, batch_id, outpoint, amt, completed diff --git a/loopdb/sqlc/querier.go b/loopdb/sqlc/querier.go index c293deb98..15b2e388f 100644 --- a/loopdb/sqlc/querier.go +++ b/loopdb/sqlc/querier.go @@ -13,7 +13,6 @@ type Querier interface { AllDeposits(ctx context.Context) ([]Deposit, error) AllStaticAddresses(ctx context.Context) ([]StaticAddress, error) CancelBatch(ctx context.Context, id int32) error - ConfirmBatch(ctx context.Context, id int32) error CreateDeposit(ctx context.Context, arg CreateDepositParams) error CreateReservation(ctx context.Context, arg CreateReservationParams) error CreateStaticAddress(ctx context.Context, arg CreateStaticAddressParams) error diff --git a/loopdb/sqlc/queries/batch.sql b/loopdb/sqlc/queries/batch.sql index 2a8696072..e1d42be4f 100644 --- a/loopdb/sqlc/queries/batch.sql +++ b/loopdb/sqlc/queries/batch.sql @@ -37,14 +37,6 @@ UPDATE sweep_batches SET last_rbf_sat_per_kw = $6 WHERE id = $1; --- name: ConfirmBatch :exec -UPDATE - sweep_batches -SET - confirmed = TRUE -WHERE - id = $1; - -- name: UpsertSweep :exec INSERT INTO sweeps ( swap_hash, diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 861882d2b..3af26bf0a 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -16,9 +16,6 @@ import ( // Querier is the interface that contains all the queries generated // by sqlc for sweep batcher. type Querier interface { - // ConfirmBatch confirms a batch by setting the state to confirmed. - ConfirmBatch(ctx context.Context, id int32) error - // GetBatchSweeps fetches all the sweeps that are part a batch. GetBatchSweeps(ctx context.Context, batchID int32) ( []sqlc.Sweep, error) @@ -124,11 +121,6 @@ func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error { return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch)) } -// ConfirmBatch confirms a batch by setting the state to confirmed. -func (s *SQLStore) ConfirmBatch(ctx context.Context, id int32) error { - return s.baseDb.ConfirmBatch(ctx, id) -} - // FetchBatchSweeps fetches all the sweeps that are part a batch. func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) ( []*dbSweep, error) { diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index 23e15f9ed..2e28a603b 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -77,22 +77,6 @@ func (s *StoreMock) UpdateSweepBatch(ctx context.Context, return nil } -// ConfirmBatch confirms a batch. -func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error { - s.mu.Lock() - defer s.mu.Unlock() - - batch, ok := s.batches[id] - if !ok { - return errors.New("batch not found") - } - - batch.Confirmed = true - s.batches[batch.ID] = batch - - return nil -} - // FetchBatchSweeps fetches all the sweeps that belong to a batch. func (s *StoreMock) FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error) { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 7fbbcda49..6bab035e7 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -59,9 +59,6 @@ type BatcherStore interface { // UpdateSweepBatch updates a batch in the database. UpdateSweepBatch(ctx context.Context, batch *dbBatch) error - // ConfirmBatch confirms a batch by setting its state to confirmed. - ConfirmBatch(ctx context.Context, id int32) error - // FetchBatchSweeps fetches all the sweeps that belong to a batch. FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error) From 0dc5d9eb69e49d1990339f908b0240da1abec307 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 24 Jun 2025 13:06:58 -0300 Subject: [PATCH 15/15] sweepbatcher: check of conf notification has tx --- sweepbatcher/sweep_batch.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 08ed82990..933077fcf 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -2059,6 +2059,10 @@ func (b *batch) handleConf(ctx context.Context, conf *chainntnfs.TxConfirmation) error { spendTx := conf.Tx + if spendTx == nil { + return fmt.Errorf("confirmation doesn't have spendTx, "+ + "height=%d, TxIndex=%d", conf.BlockHeight, conf.TxIndex) + } txHash := spendTx.TxHash() if b.batchTxid == nil || *b.batchTxid != txHash { b.Warnf("Mismatch of batch txid: tx in spend notification had "+