Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2517,6 +2517,7 @@ mod tests {
Arc::clone(&tx_broadcaster),
None,
None,
None,
)
.unwrap(),
);
Expand Down
30 changes: 27 additions & 3 deletions lightning-liquidity/src/lsps2/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,27 @@ pub enum LSPS2ClientEvent {
/// When the invoice is paid, the LSP will open a channel with the previously agreed upon
/// parameters to you.
///
/// **Note: ** This event will *not* be persisted across restarts.
/// ## BOLT11
/// For BOLT11 invoices, use `intercept_scid` and `cltv_expiry_delta` in a route hint
/// pointing to the LSP (`counterparty_node_id`).
///
/// ## BOLT12
/// For BOLT12 invoices, the same parameters are used to construct blinded payment paths
/// through the LSP:
/// - `counterparty_node_id` is the introduction node (LSP) of the blinded payment path
/// - `intercept_scid` is used as `ForwardTlvs::short_channel_id` in the blinded path
/// - `cltv_expiry_delta` is used as `PaymentRelay::cltv_expiry_delta` in the blinded path
/// - Fee parameters should be set to zero (fees are taken via fee skimming in LSPS2)
///
/// Use [`OffersMessageFlow::create_blinded_payment_paths_for_intercept_scid`] to construct
/// the blinded payment paths, and
/// [`OffersMessageFlow::create_invoice_builder_from_invoice_request_with_custom_payment_paths`]
/// to build the BOLT12 invoice with those paths.
///
/// [`OffersMessageFlow::create_blinded_payment_paths_for_intercept_scid`]: lightning::offers::flow::OffersMessageFlow::create_blinded_payment_paths_for_intercept_scid
/// [`OffersMessageFlow::create_invoice_builder_from_invoice_request_with_custom_payment_paths`]: lightning::offers::flow::OffersMessageFlow::create_invoice_builder_from_invoice_request_with_custom_payment_paths
///
/// **Note:** This event will *not* be persisted across restarts.
InvoiceParametersReady {
/// The identifier of the issued bLIP-52 / LSPS2 `buy` request, as returned by
/// [`LSPS2ClientHandler::select_opening_params`].
Expand All @@ -59,10 +79,14 @@ pub enum LSPS2ClientEvent {
/// [`LSPS2ClientHandler::select_opening_params`]: crate::lsps2::client::LSPS2ClientHandler::select_opening_params
request_id: LSPSRequestId,
/// The node id of the LSP.
///
/// For BOLT12, this is used as the introduction node of the blinded payment path.
counterparty_node_id: PublicKey,
/// The intercept short channel id to use in the route hint.
/// The intercept short channel id to use in the route hint (BOLT11) or as the
/// `ForwardTlvs::short_channel_id` in a blinded payment path (BOLT12).
intercept_scid: u64,
/// The `cltv_expiry_delta` to use in the route hint.
/// The `cltv_expiry_delta` to use in the route hint (BOLT11) or as the
/// `PaymentRelay::cltv_expiry_delta` in a blinded payment path (BOLT12).
cltv_expiry_delta: u32,
/// The initial payment size you specified.
payment_size_msat: Option<u64>,
Expand Down
101 changes: 95 additions & 6 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use lightning::events::HTLCHandlingFailureType;
use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId};
use lightning::ln::msgs::{ErrorAction, LightningError};
use lightning::ln::types::ChannelId;
use lightning::onion_message::messenger::OnionMessageInterceptor;
use lightning::util::errors::APIError;
use lightning::util::logger::Level;
use lightning::util::ser::Writeable;
Expand Down Expand Up @@ -631,17 +632,20 @@ impl PeerState {
});
}

fn prune_expired_request_state(&mut self) {
fn prune_expired_request_state(&mut self) -> Vec<u64> {
let mut pruned_scids = Vec::new();
self.outbound_channels_by_intercept_scid.retain(|intercept_scid, entry| {
if entry.is_prunable() {
// We abort the flow, and prune any data kept.
self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
self.needs_persist |= true;
pruned_scids.push(*intercept_scid);
return false;
}
true
});
pruned_scids
}

fn pending_requests_and_channels(&self) -> usize {
Expand Down Expand Up @@ -717,6 +721,7 @@ where
total_pending_requests: AtomicUsize,
config: LSPS2ServiceConfig,
persistence_in_flight: AtomicUsize,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
}

impl<CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone> LSPS2ServiceHandler<CM, K, T>
Expand All @@ -728,6 +733,7 @@ where
per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K, tx_broadcaster: T,
config: LSPS2ServiceConfig,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
) -> Result<Self, lightning::io::Error> {
let mut peer_by_intercept_scid = new_hash_map();
let mut peer_by_channel_id = new_hash_map();
Expand Down Expand Up @@ -756,6 +762,14 @@ where
}
}

// Register all peers with active intercept SCIDs for onion message interception,
// so that messages for offline peers are held rather than dropped.
if let Some(ref interceptor) = onion_message_interceptor {
for node_id in peer_by_intercept_scid.values() {
interceptor.register_peer_for_interception(*node_id);
}
}

Ok(Self {
pending_messages,
pending_events,
Expand All @@ -768,6 +782,7 @@ where
kv_store,
tx_broadcaster,
config,
onion_message_interceptor,
})
}

Expand All @@ -776,6 +791,29 @@ where
&self.config
}

/// Cleans up `peer_by_intercept_scid` entries for the given SCIDs, and deregisters the peer
/// from onion message interception if they have no remaining active intercept SCIDs.
fn cleanup_intercept_scids(
&self, counterparty_node_id: &PublicKey, pruned_scids: &[u64], has_remaining_channels: bool,
) {
if pruned_scids.is_empty() {
return;
}

{
let mut peer_by_intercept_scid = self.peer_by_intercept_scid.write().unwrap();
for scid in pruned_scids {
peer_by_intercept_scid.remove(scid);
}
}

if !has_remaining_channels {
if let Some(ref interceptor) = self.onion_message_interceptor {
interceptor.deregister_peer_for_interception(counterparty_node_id);
}
}
}

/// Returns whether the peer has any active LSPS2 requests.
pub(crate) fn has_active_requests(&self, counterparty_node_id: &PublicKey) -> bool {
let outer_state_lock = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -921,6 +959,10 @@ where
peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id);
}

if let Some(ref interceptor) = self.onion_message_interceptor {
interceptor.register_peer_for_interception(*counterparty_node_id);
}

let outbound_jit_channel = OutboundJITChannel::new(
buy_request.payment_size_msat,
buy_request.opening_fee_params,
Expand Down Expand Up @@ -1051,7 +1093,15 @@ where
peer_state
.outbound_channels_by_intercept_scid
.remove(&intercept_scid);
// TODO: cleanup peer_by_intercept_scid
let has_remaining =
!peer_state.outbound_channels_by_intercept_scid.is_empty();
drop(peer_state);
drop(outer_state_lock);
self.cleanup_intercept_scids(
counterparty_node_id,
&[intercept_scid],
has_remaining,
);
return Err(APIError::APIMisuseError { err: e.err });
},
}
Expand Down Expand Up @@ -1270,7 +1320,7 @@ where
pub async fn channel_open_abandoned(
&self, counterparty_node_id: &PublicKey, user_channel_id: u128,
) -> Result<(), APIError> {
{
let (intercept_scid, has_remaining) = {
let outer_state_lock = self.per_peer_state.read().unwrap();
let inner_state_lock = outer_state_lock.get(counterparty_node_id).ok_or_else(|| {
APIError::APIMisuseError {
Expand Down Expand Up @@ -1317,7 +1367,11 @@ where
peer_state.outbound_channels_by_intercept_scid.remove(&intercept_scid);
peer_state.intercept_scid_by_channel_id.retain(|_, &mut scid| scid != intercept_scid);
peer_state.needs_persist |= true;
}
let has_remaining = !peer_state.outbound_channels_by_intercept_scid.is_empty();
(intercept_scid, has_remaining)
};

self.cleanup_intercept_scids(counterparty_node_id, &[intercept_scid], has_remaining);

self.persist_peer_state(*counterparty_node_id).await.map_err(|e| {
APIError::APIMisuseError {
Expand Down Expand Up @@ -1801,17 +1855,32 @@ where
{
// First build a list of peers to persist and prune with the read lock. This allows
// us to avoid the write lock unless we actually need to remove a node.
let mut all_pruned_scids = Vec::new();
let outer_state_lock = self.per_peer_state.read().unwrap();
for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
peer_state_lock.prune_expired_request_state();
let pruned_scids = peer_state_lock.prune_expired_request_state();
if !pruned_scids.is_empty() {
let has_remaining =
!peer_state_lock.outbound_channels_by_intercept_scid.is_empty();
all_pruned_scids.push((*counterparty_node_id, pruned_scids, has_remaining));
}
let is_prunable = peer_state_lock.is_prunable();
if is_prunable {
need_remove.push(*counterparty_node_id);
} else if peer_state_lock.needs_persist {
need_persist.push(*counterparty_node_id);
}
}
drop(outer_state_lock);

for (counterparty_node_id, pruned_scids, has_remaining) in all_pruned_scids {
self.cleanup_intercept_scids(
&counterparty_node_id,
&pruned_scids,
has_remaining,
);
}
}

for counterparty_node_id in need_persist.into_iter() {
Expand All @@ -1822,6 +1891,7 @@ where

for counterparty_node_id in need_remove {
let mut future_opt = None;
let mut was_removed = false;
{
// We need to take the `per_peer_state` write lock to remove an entry, but also
// have to hold it until after the `remove` call returns (but not through
Expand All @@ -1833,6 +1903,7 @@ where
let state = entry.get_mut().get_mut().unwrap();
if state.is_prunable() {
entry.remove();
was_removed = true;
let key = counterparty_node_id.to_string();
future_opt = Some(self.kv_store.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1850,6 +1921,20 @@ where
debug_assert!(false);
}
}
if was_removed {
// Clean up handler-level maps for the removed peer.
self.peer_by_intercept_scid
.write()
.unwrap()
.retain(|_, node_id| *node_id != counterparty_node_id);
self.peer_by_channel_id
.write()
.unwrap()
.retain(|_, node_id| *node_id != counterparty_node_id);
if let Some(ref interceptor) = self.onion_message_interceptor {
interceptor.deregister_peer_for_interception(&counterparty_node_id);
}
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
Expand Down Expand Up @@ -1877,7 +1962,11 @@ where
// We clean up the peer state, but leave removing the peer entry to the prune logic in
// `persist` which removes it from the store.
peer_state_lock.prune_pending_requests();
peer_state_lock.prune_expired_request_state();
let pruned_scids = peer_state_lock.prune_expired_request_state();
let has_remaining = !peer_state_lock.outbound_channels_by_intercept_scid.is_empty();
drop(peer_state_lock);
drop(outer_state_lock);
self.cleanup_intercept_scids(&counterparty_node_id, &pruned_scids, has_remaining);
}
}

Expand Down
9 changes: 9 additions & 0 deletions lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
use lightning::ln::msgs::{ErrorAction, LightningError};
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::wire::CustomMessageReader;
use lightning::onion_message::messenger::OnionMessageInterceptor;
use lightning::sign::{EntropySource, NodeSigner};
use lightning::util::logger::Level;
use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper};
Expand Down Expand Up @@ -330,6 +331,7 @@ where
chain_params: Option<ChainParameters>, kv_store: K, transaction_broadcaster: T,
service_config: Option<LiquidityServiceConfig>,
client_config: Option<LiquidityClientConfig>,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
) -> Result<Self, lightning::io::Error> {
Self::new_with_custom_time_provider(
entropy_source,
Expand All @@ -342,6 +344,7 @@ where
service_config,
client_config,
DefaultTimeProvider,
onion_message_interceptor,
)
.await
}
Expand Down Expand Up @@ -373,6 +376,7 @@ where
chain_source: Option<C>, chain_params: Option<ChainParameters>, kv_store: K,
service_config: Option<LiquidityServiceConfig>,
client_config: Option<LiquidityClientConfig>, time_provider: TP,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
) -> Result<Self, lightning::io::Error> {
let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new());
let pending_messages =
Expand Down Expand Up @@ -415,6 +419,7 @@ where
kv_store.clone(),
transaction_broadcaster.clone(),
lsps2_service_config.clone(),
onion_message_interceptor.clone(),
)?)
} else {
None
Expand Down Expand Up @@ -1044,6 +1049,7 @@ where
chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
service_config: Option<LiquidityServiceConfig>,
client_config: Option<LiquidityClientConfig>,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
) -> Result<Self, lightning::io::Error> {
let kv_store = KVStoreSyncWrapper(kv_store_sync);

Expand All @@ -1057,6 +1063,7 @@ where
transaction_broadcaster,
service_config,
client_config,
onion_message_interceptor,
));

let mut waker = dummy_waker();
Expand Down Expand Up @@ -1094,6 +1101,7 @@ where
chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
service_config: Option<LiquidityServiceConfig>,
client_config: Option<LiquidityClientConfig>, time_provider: TP,
onion_message_interceptor: Option<Arc<dyn OnionMessageInterceptor + Send + Sync>>,
) -> Result<Self, lightning::io::Error> {
let kv_store = KVStoreSyncWrapper(kv_store_sync);
let mut fut = pin!(LiquidityManager::new_with_custom_time_provider(
Expand All @@ -1107,6 +1115,7 @@ where
service_config,
client_config,
time_provider,
onion_message_interceptor,
));

let mut waker = dummy_waker();
Expand Down
2 changes: 2 additions & 0 deletions lightning-liquidity/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
Some(service_config),
None,
Arc::clone(&time_provider),
None,
)
.unwrap();

Expand All @@ -61,6 +62,7 @@ fn build_service_and_client_nodes<'a, 'b, 'c>(
None,
Some(client_config),
time_provider,
None,
)
.unwrap();

Expand Down
Loading
Loading