Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,33 @@ func (c *ChannelCommitment) extractTlvData() commitTlvData {
return auxData
}

// copy returns a deep copy of the channel commitment.
func (c *ChannelCommitment) copy() ChannelCommitment {
c2 := *c
if c.CommitTx != nil {
c2.CommitTx = c.CommitTx.Copy()
}
if len(c.CommitSig) > 0 {
c2.CommitSig = make([]byte, len(c.CommitSig))
copy(c2.CommitSig, c.CommitSig)
}

c.CustomBlob.WhenSome(func(blob tlv.Blob) {
blobCopy := make([]byte, len(blob))
copy(blobCopy, blob)
c2.CustomBlob = fn.Some(blobCopy)
})

if len(c.Htlcs) > 0 {
c2.Htlcs = make([]HTLC, len(c.Htlcs))
for i, h := range c.Htlcs {
c2.Htlcs[i] = h.Copy()
}
}

return c2
}

// ChannelStatus is a bit vector used to indicate whether an OpenChannel is in
// the default usable state, or a state where it shouldn't be used.
type ChannelStatus uint64
Expand Down Expand Up @@ -4066,6 +4093,78 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot {
return snapshot
}

// Copy returns a deep copy of the channel state.
func (c *OpenChannel) Copy() *OpenChannel {
c.RLock()
defer c.RUnlock()

clone := &OpenChannel{
ChanType: c.ChanType,
ChainHash: c.ChainHash,
FundingOutpoint: c.FundingOutpoint,
ShortChannelID: c.ShortChannelID,
IsPending: c.IsPending,
IsInitiator: c.IsInitiator,
chanStatus: c.chanStatus,
FundingBroadcastHeight: c.FundingBroadcastHeight,
ConfirmationHeight: c.ConfirmationHeight,
NumConfsRequired: c.NumConfsRequired,
ChannelFlags: c.ChannelFlags,
IdentityPub: c.IdentityPub,
Capacity: c.Capacity,
TotalMSatSent: c.TotalMSatSent,
TotalMSatReceived: c.TotalMSatReceived,
InitialLocalBalance: c.InitialLocalBalance,
InitialRemoteBalance: c.InitialRemoteBalance,
LocalChanCfg: c.LocalChanCfg,
RemoteChanCfg: c.RemoteChanCfg,
LocalCommitment: c.LocalCommitment.copy(),
RemoteCommitment: c.RemoteCommitment.copy(),
RemoteCurrentRevocation: c.RemoteCurrentRevocation,
RemoteNextRevocation: c.RemoteNextRevocation,
RevocationProducer: c.RevocationProducer,
RevocationStore: c.RevocationStore,
Packager: c.Packager,
ThawHeight: c.ThawHeight,
LastWasRevoke: c.LastWasRevoke,
RevocationKeyLocator: c.RevocationKeyLocator,
confirmedScid: c.confirmedScid,
TapscriptRoot: c.TapscriptRoot,
}

if c.FundingTxn != nil {
clone.FundingTxn = c.FundingTxn.Copy()
}

if len(c.LocalShutdownScript) > 0 {
clone.LocalShutdownScript = make(
lnwire.DeliveryAddress,
len(c.LocalShutdownScript),
)
copy(clone.LocalShutdownScript, c.LocalShutdownScript)
}
if len(c.RemoteShutdownScript) > 0 {
clone.RemoteShutdownScript = make(
lnwire.DeliveryAddress,
len(c.RemoteShutdownScript),
)
copy(clone.RemoteShutdownScript, c.RemoteShutdownScript)
}

if len(c.Memo) > 0 {
clone.Memo = make([]byte, len(c.Memo))
copy(clone.Memo, c.Memo)
}

c.CustomBlob.WhenSome(func(blob tlv.Blob) {
blobCopy := make([]byte, len(blob))
copy(blobCopy, blob)
clone.CustomBlob = fn.Some(blobCopy)
})

return clone
}

// LatestCommitments returns the two latest commitments for both the local and
// remote party. These commitments are read from disk to ensure that only the
// latest fully committed state is returned. The first commitment returned is
Expand Down
17 changes: 17 additions & 0 deletions channelnotifier/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type ClosedChannelEvent struct {
CloseSummary *channeldb.ChannelCloseSummary
}

// ChannelUpdateEvent represents a new event where a channel's state is updated.
type ChannelUpdateEvent struct {
// Channel is the channel that has been updated.
Channel *channeldb.OpenChannel
}

// FullyResolvedChannelEvent represents a new event where a channel becomes
// fully resolved.
type FullyResolvedChannelEvent struct {
Expand Down Expand Up @@ -238,3 +244,14 @@ func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {
log.Warnf("Unable to send inactive channel update: %v", err)
}
}

// NotifyChannelUpdateEvent notifies subscribers that a channel's state has been
// updated.
func (c *ChannelNotifier) NotifyChannelUpdateEvent(
channel *channeldb.OpenChannel) {

event := ChannelUpdateEvent{Channel: channel}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send channel update: %v", err)
}
}
47 changes: 47 additions & 0 deletions channelnotifier/channelnotifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package channelnotifier

import (
"testing"
"time"

"github.com/lightningnetwork/lnd/channeldb"
"github.com/stretchr/testify/require"
)

// TestChannelUpdateEvent tests that channel update events are properly
// notified to subscribers.
func TestChannelUpdateEvent(t *testing.T) {
// Initialize the notification server.
ntfnServer := New(nil)
if err := ntfnServer.Start(); err != nil {
t.Fatalf("unable to start notification server: %v", err)
}
defer func() {
err := ntfnServer.Stop()
if err != nil {
t.Logf("unable to stop notification server: %v", err)
}
}()

// Subscribe to channel events.
sub, err := ntfnServer.SubscribeChannelEvents()
require.NoError(t, err)
defer sub.Cancel()

// Create a mock channel state.
channel := &channeldb.OpenChannel{}

// Notify the server of a channel update event.
ntfnServer.NotifyChannelUpdateEvent(channel)

// Consume the event.
select {
case event := <-sub.Updates():
updateEvent, ok := event.(ChannelUpdateEvent)
require.True(t, ok)
require.Equal(t, channel, updateEvent.Channel)

case <-time.After(time.Second):
t.Fatalf("expected to receive channel update event")
}
}
5 changes: 5 additions & 0 deletions docs/release-notes/release-notes-0.21.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@
* routerrpc HTLC event subscribers now receive specific failure details for
invoice-level validation failures, avoiding ambiguous `UNKNOWN` results. [#10520](https://github.com/lightningnetwork/lnd/pull/10520)

* SubscribeChannelEvents [now emits channel update
events](https://github.com/lightningnetwork/lnd/pull/10543) to be able to
subscribe to state changes.

## lncli Updates

## Breaking Changes
Expand Down Expand Up @@ -158,6 +162,7 @@

# Contributors (Alphabetical Order)

* bitromortac
* Boris Nagaev
* Elle Mouton
* Erick Cestari
Expand Down
11 changes: 11 additions & 0 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ type ChannelLinkConfig struct {
// ChannelNotifier when a channel link become inactive.
NotifyInactiveLinkEvent func(wire.OutPoint)

// NotifyChannelUpdate allows the link to tell the ChannelNotifier when
// a channel's state has been updated.
NotifyChannelUpdate func(*channeldb.OpenChannel)

// HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
// events through.
HtlcNotifier htlcNotifier
Expand Down Expand Up @@ -1813,6 +1817,13 @@ func (l *channelLink) handleUpstreamMsg(ctx context.Context,
case *lnwire.CommitSig:
err = l.processRemoteCommitSig(ctx, msg)

// At this point our local commitment state has been irrevokably
// committed to and our balances are updated. We notify our
// subscribers that the channel state has been updated.
if err == nil {
l.cfg.NotifyChannelUpdate(l.channel.ChannelState())
}

case *lnwire.RevokeAndAck:
err = l.processRemoteRevokeAndAck(ctx, msg)

Expand Down
2 changes: 2 additions & 0 deletions htlcswitch/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyChannelUpdate: func(*channeldb.OpenChannel) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
HtlcNotifier: aliceSwitch.cfg.HtlcNotifier,
Expand Down Expand Up @@ -4931,6 +4932,7 @@ func (h *persistentLinkHarness) restartLink(
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
NotifyChannelUpdate: func(*channeldb.OpenChannel) {},
HtlcNotifier: h.hSwitch.cfg.HtlcNotifier,
SyncStates: syncStates,
GetAliases: getAliases,
Expand Down
1 change: 1 addition & 0 deletions htlcswitch/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
NotifyChannelUpdate: func(*channeldb.OpenChannel) {},
HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier,
GetAliases: getAliases,
ShouldFwdExpAccountability: func() bool { return true },
Expand Down
4 changes: 4 additions & 0 deletions itest/list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ var allTestCases = []*lntest.TestCase{
Name: "invoice update subscription",
TestFunc: testInvoiceSubscriptions,
},
{
Name: "channel update subscription",
TestFunc: testChannelUpdateNotifications,
},
{
Name: "streaming channel backup update",
TestFunc: testChannelBackupUpdates,
Expand Down
99 changes: 99 additions & 0 deletions itest/lnd_open_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package itest
import (
"fmt"
"strings"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -1342,3 +1343,101 @@ func testOpenChannelWithShutdownAddr(ht *lntest.HarnessTest) {
require.Equal(ht, bobShutdownAddr.Address, bobUTXOConfirmed.Address)
require.Equal(ht, paymentAmount, bobUTXOConfirmed.AmountSat)
}

// testChannelUpdateNotifications checks that clients subscribed to channel
// events receive real-time updates when the channel state changes.
func testChannelUpdateNotifications(ht *lntest.HarnessTest) {
// We'll start by creating two nodes, Alice and Bob, and a channel
// between them.
alice := ht.NewNodeWithCoins("Alice", nil)
bob := ht.NewNode("Bob", nil)

ht.EnsureConnected(alice, bob)

// We'll subscribe to channel events for both nodes.
aliceSub := alice.RPC.SubscribeChannelEvents()
bobSub := bob.RPC.SubscribeChannelEvents()

// We'll then open a channel between Alice and Bob.
chanPoint := ht.OpenChannel(
alice, bob, lntest.OpenChannelParams{
Amt: 1000000,
PushAmt: 500000,
},
)

// We'll wait for the channel to be active. We expect to receive one
// pending, one open, and one active notification.
ht.AssertChannelActive(alice, chanPoint)
ht.AssertChannelActive(bob, chanPoint)

ht.AssertChannelEventType(
aliceSub, lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL,
)
ht.AssertChannelEventType(
aliceSub, lnrpc.ChannelEventUpdate_OPEN_CHANNEL,
)
ht.AssertChannelEventType(
aliceSub, lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL,
)

ht.AssertChannelEventType(
bobSub, lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL,
)
ht.AssertChannelEventType(
bobSub, lnrpc.ChannelEventUpdate_OPEN_CHANNEL,
)
ht.AssertChannelEventType(
bobSub, lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL,
)

// We'll now make a payment from Alice to Bob to trigger a channel
// update.
payReqs, _, _ := ht.CreatePayReqs(bob, btcutil.Amount(1000), 1)
ht.CompletePaymentRequests(alice, payReqs)

// assertUpdates is a helper function to assert the number of commitment
// updates received by a node.
assertUpdates := func(sub rpc.ChannelEventsClient, numUpdates int) {
event := ht.AssertChannelEventType(
sub, lnrpc.ChannelEventUpdate_CHANNEL_UPDATE,
)
require.IsType(
ht, &lnrpc.ChannelEventUpdate_UpdatedChannel{},
event.Channel,
)
channel := event.GetUpdatedChannel().Channel
require.EqualValues(ht, numUpdates, channel.NumUpdates)
}

// expectNoMoreUpdates is a helper function to assert that no more
// channel updates are received by a node.
expectNoMoreUpdates := func(sub rpc.ChannelEventsClient) {
updates := make(chan struct{})
go func() {
_, err := sub.Recv()
// Only signal if we successfully received an update.
// If Recv fails (e.g., context canceled during test
// cleanup), that's fine - it means no update arrived.
if err == nil {
close(updates)
}
}()

select {
case <-updates:
ht.Fatalf("expected no more updates")
case <-time.After(defaultTimeout):
}
}

// We expect to see two updates from each node's point of view. One for
// the addition of the HTLC, and a second for the settlement.
assertUpdates(aliceSub, 1)
assertUpdates(aliceSub, 2)
expectNoMoreUpdates(aliceSub)

assertUpdates(bobSub, 1)
assertUpdates(bobSub, 2)
expectNoMoreUpdates(bobSub)
}
Loading
Loading