Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ type ClientConfig struct {
// MaxStaticAddrHtlcFeePercentage since it serves the server as backup
// transaction in case of fee spikes.
MaxStaticAddrHtlcBackupFeePercentage float64

// HtlcConfirmedNtfn is an optional notifier that provides HTLC
// confirmed notifications from the server.
HtlcConfirmedNtfn HtlcConfirmedNotifier
}

// NewClient returns a new instance to initiate swaps with.
Expand All @@ -201,8 +205,9 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
CreateExpiryTimer: func(d time.Duration) <-chan time.Time {
return time.NewTimer(d).C
},
AssetClient: cfg.AssetClient,
LoopOutMaxParts: cfg.LoopOutMaxParts,
AssetClient: cfg.AssetClient,
LoopOutMaxParts: cfg.LoopOutMaxParts,
HtlcConfirmedNtfn: cfg.HtlcConfirmedNtfn,
}

sweeper := &sweep.Sweeper{
Expand Down Expand Up @@ -488,7 +493,7 @@ func (s *Client) resumeSwaps(ctx context.Context,

swapCfg := newSwapConfig(
s.lndServices, s.Store, s.Server, s.AssetClient,
clock.NewDefaultClock(),
clock.NewDefaultClock(), s.HtlcConfirmedNtfn,
)

for _, pend := range loopOutSwaps {
Expand Down Expand Up @@ -582,7 +587,7 @@ func (s *Client) LoopOut(globalCtx context.Context,
// Create a new swap object for this swap.
swapCfg := newSwapConfig(
s.lndServices, s.Store, s.Server, s.AssetClient,
clock.NewDefaultClock(),
clock.NewDefaultClock(), s.HtlcConfirmedNtfn,
)

initResult, err := newLoopOutSwap(
Expand Down Expand Up @@ -766,7 +771,7 @@ func (s *Client) LoopIn(globalCtx context.Context,
initiationHeight := s.executor.height()
swapCfg := newSwapConfig(
s.lndServices, s.Store, s.Server, s.AssetClient,
clock.NewDefaultClock(),
clock.NewDefaultClock(), s.HtlcConfirmedNtfn,
)
initResult, err := newLoopInSwap(
globalCtx, swapCfg, initiationHeight, request,
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type clientConfig struct {
L402Store l402.Store
CreateExpiryTimer func(expiry time.Duration) <-chan time.Time
LoopOutMaxParts uint32
HtlcConfirmedNtfn HtlcConfirmedNotifier
}
5 changes: 5 additions & 0 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
notificationManager := notifications.NewManager(notificationCfg)

// Set the HTLC confirmed notifier on the swap client so that
// individual swaps can subscribe to server-side HTLC confirmed
// notifications.
swapClient.HtlcConfirmedNtfn = notificationManager

d.wg.Go(func() {
infof("Starting notification manager")
err := notificationManager.Run(d.mainCtx)
Expand Down
8 changes: 4 additions & 4 deletions loopin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func testLoopInSuccess(t *testing.T) {

cfg := newSwapConfig(
&ctx.lnd.LndServices, ctx.store, ctx.server, nil,
clock.NewTestClock(startTime),
clock.NewTestClock(startTime), nil,
)

expectedLastHop := &route.Vertex{0x02}
Expand Down Expand Up @@ -287,7 +287,7 @@ func testLoopInTimeout(t *testing.T, externalValue int64) {

cfg := newSwapConfig(
&ctx.lnd.LndServices, ctx.store, ctx.server, nil,
clock.NewTestClock(time.Unix(123, 0)),
clock.NewTestClock(time.Unix(123, 0)), nil,
)

req := testLoopInRequest
Expand Down Expand Up @@ -501,7 +501,7 @@ func testLoopInResume(t *testing.T, state loopdb.SwapState, expired bool,
ctx := newLoopInTestContext(t)
cfg := newSwapConfig(
&ctx.lnd.LndServices, ctx.store, ctx.server, nil,
clock.NewTestClock(time.Unix(123, 0)),
clock.NewTestClock(time.Unix(123, 0)), nil,
)

// Create sender and receiver keys.
Expand Down Expand Up @@ -859,7 +859,7 @@ func startNewLoopIn(t *testing.T, ctx *loopInTestContext, height int32) (

cfg := newSwapConfig(
&ctx.lnd.LndServices, ctx.store, ctx.server, nil,
clock.NewTestClock(time.Unix(123, 0)),
clock.NewTestClock(time.Unix(123, 0)), nil,
)

req := &testLoopInRequest
Expand Down
234 changes: 234 additions & 0 deletions loopout.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loop

import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
Expand All @@ -9,6 +10,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -74,6 +76,25 @@ const (
urgentSweepConfTargetFactor = 1.1
)

// htlcConfNtfnState is a small FSM describing the current state of HTLC
// notifications.
type htlcConfNtfnState uint32

const (
// htlcConfNtfnStateIdle means that no notification is currently being
// processed.
htlcConfNtfnStateIdle htlcConfNtfnState = iota

// htlcConfNtfnStateInFlight means that a matching notification is
// currently being handled.
htlcConfNtfnStateInFlight

// htlcConfNtfnStateRegistered means that a notification has already
// registered and delivered a confirmation, so later retries should be
// ignored.
htlcConfNtfnStateRegistered
)

// loopOutSwap contains all the in-memory state related to a pending loop out
// swap.
type loopOutSwap struct {
Expand All @@ -97,6 +118,8 @@ type loopOutSwap struct {
swapPaymentChan chan paymentResult
prePaymentChan chan paymentResult

htlcConfNtfnState atomic.Uint32

wg sync.WaitGroup
}

Expand Down Expand Up @@ -1019,6 +1042,13 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
return nil, err
}

// Start a parallel goroutine that listens for server-side HTLC
// confirmed notifications. If a matching notification arrives, it
// registers for confirmations on the specific UTXO and forwards
// the result to ntfnConfChan.
ntfnConfChan := make(chan *chainntnfs.TxConfirmation, 1)
go s.listenForHtlcConfirmedNtfn(ctx, ntfnConfChan)

var txConf *chainntnfs.TxConfirmation
if s.state == loopdb.StateInitiated {
// Check if it is already too late to start this swap. If we
Expand Down Expand Up @@ -1110,6 +1140,11 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
txConf = htlcConfNtfn
break loop

// Htlc got confirmed via server notification.
case htlcConfNtfn := <-ntfnConfChan:
txConf = htlcConfNtfn
break loop

// New block is received. Recheck max reveal height.
case notification := <-s.blockEpochChan:
s.height = notification.(int32)
Expand All @@ -1134,6 +1169,8 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
return nil, err
case htlcConfNtfn := <-htlcConfChan:
txConf = htlcConfNtfn
case htlcConfNtfn := <-ntfnConfChan:
txConf = htlcConfNtfn
case <-globalCtx.Done():
return nil, globalCtx.Err()
}
Expand All @@ -1147,6 +1184,203 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
return txConf, nil
}

// listenForHtlcConfirmedNtfn listens for server-side HTLC confirmed
// notifications and registers for confirmations when a matching
// notification arrives. This runs in parallel with the existing
// chain notifier path. All errors are non-fatal and logged at Info.
//
// We only allow one matching notification to be processed at a time and stop
// processing once one notification has successfully registered and delivered a
// confirmation. This deduplicates retry notifications from the server.
func (s *loopOutSwap) listenForHtlcConfirmedNtfn(ctx context.Context,
ntfnConfChan chan<- *chainntnfs.TxConfirmation) {

if s.htlcConfirmedNtfn == nil {
return
}

ntfnChan := s.htlcConfirmedNtfn.SubscribeHtlcConfirmed(ctx)

for {
select {
case ntfn, ok := <-ntfnChan:
if !ok {
return
}

if ntfn == nil {
s.log.Infof("Ignoring nil htlc confirmed ntfn")
continue
}

ntfnHash, err := lntypes.MakeHash(ntfn.SwapHash)
if err != nil {
s.log.Infof("Ignoring htlc confirmed ntfn, bad "+
"hash: %v", err)
continue
}

if ntfnHash != s.hash {
continue
}

if !s.tryStartHtlcConfirmedNtfn() {
s.log.Infof("Ignoring duplicate htlc " +
"confirmed ntfn")
continue
}

// Spawn a goroutine so we don't block the subscription
// channel while waiting for chain confirmation.
go func(ntfn *swapserverrpc.ServerHtlcConfirmedNotification) {
registered := s.handleHtlcConfirmedNtfn(
ctx, ntfn, ntfnConfChan,
)
s.finishHtlcConfirmedNtfn(registered)
}(ntfn)

case <-ctx.Done():
return
}
}
}

// tryStartHtlcConfirmedNtfn atomically transitions the notification gate from
// idle to in-flight. It returns true only for the first matching notification
// while no other notification is being processed and no successful
// notification-based confirmation has been registered yet.
func (s *loopOutSwap) tryStartHtlcConfirmedNtfn() bool {
return s.htlcConfNtfnState.CompareAndSwap(
uint32(htlcConfNtfnStateIdle),
uint32(htlcConfNtfnStateInFlight),
)
}

// finishHtlcConfirmedNtfn atomically clears an in-flight notification attempt.
// Failed attempts transition back to idle so later server retries may be
// processed, while successful attempts transition to registered so all future
// retries are ignored.
func (s *loopOutSwap) finishHtlcConfirmedNtfn(registered bool) {
nextState := htlcConfNtfnStateIdle
if registered {
nextState = htlcConfNtfnStateRegistered
}

s.htlcConfNtfnState.CompareAndSwap(
uint32(htlcConfNtfnStateInFlight),
uint32(nextState),
)
}

// handleHtlcConfirmedNtfn processes a single HTLC confirmed
// notification from the server. It checks the swap hash, parses
// the outpoint, registers for confirmations, and forwards the
// result. All errors are logged at Info and swallowed.
func (s *loopOutSwap) handleHtlcConfirmedNtfn(ctx context.Context,
ntfn *swapserverrpc.ServerHtlcConfirmedNotification,
ntfnConfChan chan<- *chainntnfs.TxConfirmation) bool {

if ntfn == nil {
s.log.Infof("Ignoring nil htlc confirmed ntfn")
return false
}

if s.lnd == nil || s.lnd.ChainNotifier == nil {
s.log.Infof("Ignoring htlc confirmed ntfn, chain " +
"notifier unavailable")
return false
}

if s.htlc == nil || len(s.htlc.PkScript) == 0 {
s.log.Infof("Ignoring htlc confirmed ntfn, htlc " +
"details unavailable")
return false
}

// Check if the notification is for this swap.
ntfnHash, err := lntypes.MakeHash(ntfn.SwapHash)
if err != nil {
s.log.Infof("Ignoring htlc confirmed ntfn, bad "+
"hash: %v", err)
return false
}

if ntfnHash != s.hash {
return false
}

s.log.Infof("Received htlc confirmed notification for "+
"outpoint %v", ntfn.HtlcOutpoint)

// Parse the outpoint string "txid:index".
htlcOutpoint, err := wire.NewOutPointFromString(ntfn.HtlcOutpoint)
if err != nil {
s.log.Infof("Ignoring htlc confirmed ntfn, bad "+
"outpoint: %v", err)
return false
}

// Register for confirmations on the specific UTXO.
confChan, errChan, err :=
s.lnd.ChainNotifier.RegisterConfirmationsNtfn(
ctx, &htlcOutpoint.Hash, s.htlc.PkScript,
int32(s.HtlcConfirmations),
s.InitiationHeight,
)
if err != nil {
s.log.Infof("Failed to register conf ntfn from "+
"server notification: %v", err)
return false
}

// Wait for the confirmation or context cancellation.
select {
case conf := <-confChan:
if conf == nil || conf.Tx == nil {
s.log.Infof("Ignoring htlc confirmed ntfn, " +
"empty confirmation")
return false
}

// Verify that the confirmed transaction actually
// has our HTLC output at the expected index.
if uint64(len(conf.Tx.TxOut)) <= uint64(htlcOutpoint.Index) {
s.log.Infof("Ignoring htlc confirmed "+
"ntfn, output index %d out of "+
"range (tx has %d outputs)",
htlcOutpoint.Index, len(conf.Tx.TxOut))
return false
}

txOut := conf.Tx.TxOut[htlcOutpoint.Index]
if !bytes.Equal(txOut.PkScript, s.htlc.PkScript) {
s.log.Infof("Ignoring htlc confirmed "+
"ntfn, output at index %d has "+
"wrong pkscript", htlcOutpoint.Index)
return false
}

s.log.Infof("HTLC confirmed via server "+
"notification at height %v",
conf.BlockHeight)

select {
case ntfnConfChan <- conf:
return true
case <-ctx.Done():
return false
}

case err := <-errChan:
s.log.Infof("Error waiting for conf from server "+
"notification: %v", err)
return false

case <-ctx.Done():
return false
}
}

// waitForHtlcSpendConfirmedV2 waits for the htlc to be spent either by our own
// sweep or a server revocation tx.
func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
Expand Down
Loading
Loading