From c88bca9a3afa968a022fffd1c34c6b4dda3c2331 Mon Sep 17 00:00:00 2001 From: unldenis Date: Mon, 6 Oct 2025 15:38:07 +0200 Subject: [PATCH 1/6] New QueuedEvent struct --- src/router/actor.rs | 60 ++++++++++++++++++++++++++++--------------- src/router/channel.rs | 20 +++++++-------- 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/src/router/actor.rs b/src/router/actor.rs index 4c8b74d..54f3e12 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -4,10 +4,7 @@ use std::{ }; use nostr::{ - event::{Event, EventBuilder, Kind}, - filter::{Filter, MatchEventOptions}, - message::{RelayMessage, SubscriptionId}, - nips::nip44, + event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, hashes::hash160::Hash, message::{RelayMessage, SubscriptionId}, nips::nip44 }; use nostr_relay_pool::RelayPoolNotification; use serde::{Serialize, de::DeserializeOwned}; @@ -362,6 +359,7 @@ pub struct MessageRouterActorState { keypair: LocalKeypair, /// All conversation states conversations: HashMap, + queue_events: HashSet, } impl MessageRouterActorState { @@ -369,6 +367,7 @@ impl MessageRouterActorState { Self { keypair, conversations: HashMap::new(), + queue_events: HashSet::new() } } @@ -857,12 +856,12 @@ impl MessageRouterActorState { // check if Response has selected relays if let Some(selected_relays) = selected_relays_optional { for event in events_to_broadcast { - self.queue_event(channel, event, Some(selected_relays.clone())) + self.queue_event(channel, QueuedEvent::new_with_relays(event, selected_relays.clone())) .await?; } } else { for event in events_to_broadcast { - self.queue_event(channel, event, None).await?; + self.queue_event(channel, QueuedEvent::new(event)).await?; } // TODO: wait for confirmation from relays @@ -879,25 +878,12 @@ impl MessageRouterActorState { async fn queue_event( &self, channel: &Arc, - event: Event, - relays: Option>, + queue_event: QueuedEvent, ) -> Result<(), ConversationError> where C::Error: From, { - if let Some(relays) = relays { - // if selected relays, broadcast to selected relays - channel - .broadcast_to(relays, event) - .await - .map_err(|e| ConversationError::Inner(Box::new(e)))?; - } else { - // if not selected relays, broadcast to all relays - channel - .broadcast(event) - .await - .map_err(|e| ConversationError::Inner(Box::new(e)))?; - } + queue_event.broadcast(channel).await?; Ok(()) } @@ -1226,3 +1212,35 @@ impl std::fmt::Debug for ConversationBox { f.debug_struct("Conversation").finish() } } + + +pub struct QueuedEvent { + event: Event, + relays: Option>, +} + +impl QueuedEvent { + pub fn new(event: Event ) -> Self { + Self { event, relays: None } + } + pub fn new_with_relays(event: Event, relays: HashSet) -> Self { + Self { event, relays: Some(relays) } + } + + pub async fn broadcast(&self, channel: &Arc) + -> Result<(), ConversationError> + where + C::Error: From, + { + let event = self.event.clone(); + let relays = self.relays.clone(); + + if let Some(relays) = relays { + channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + } else { + channel.broadcast(event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + } + Ok(()) + } + +} \ No newline at end of file diff --git a/src/router/channel.rs b/src/router/channel.rs index cbdfd76..35209a4 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -38,12 +38,12 @@ pub trait Channel: Send + 'static { fn broadcast( &self, event: nostr::Event, - ) -> impl std::future::Future> + Send; + ) -> impl std::future::Future> + Send; fn broadcast_to( &self, urls: I, event: nostr::Event, - ) -> impl std::future::Future> + Send + ) -> impl std::future::Future> + Send where ::IntoIter: Send, I: IntoIterator + Send, @@ -113,19 +113,19 @@ impl Channel for RelayPool { Ok(()) } - async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> { - self.send_event(&event).await?; - Ok(()) + async fn broadcast(&self, event: nostr::Event) -> Result { + let output = self.send_event(&event).await?; + return Ok(!output.success.is_empty()); } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error> + async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send, U: TryIntoUrl, Self::Error: From<::Err>, { - self.send_event_to(urls, &event).await?; - Ok(()) + let output = self.send_event_to(urls, &event).await?; + return Ok(!output.success.is_empty()); } async fn receive(&self) -> Result { @@ -167,11 +167,11 @@ impl Channel for std::sync::Arc { ::unsubscribe(self, id).await } - async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> { + async fn broadcast(&self, event: nostr::Event) -> Result { ::broadcast(self, event).await } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error> + async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send, From 73de380f43d4d3c4a39f8f6447c2595eafbb13eb Mon Sep 17 00:00:00 2001 From: unldenis Date: Mon, 6 Oct 2025 16:24:57 +0200 Subject: [PATCH 2/6] Queue event with incremental delay --- cli/src/bin/invoices.rs | 4 ++-- cli/src/lib.rs | 5 +---- src/router/actor.rs | 45 ++++++++++++++++++++++++++++++++--------- src/router/channel.rs | 2 +- 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/cli/src/bin/invoices.rs b/cli/src/bin/invoices.rs index 2ba928e..95c05af 100644 --- a/cli/src/bin/invoices.rs +++ b/cli/src/bin/invoices.rs @@ -38,7 +38,7 @@ impl InvoiceResponseListener for LogInvoiceResponseListener { async fn main() -> Result<(), CliError> { env_logger::init(); - let relays = vec!["wss://relay.nostr.net".to_string()]; + let relays = vec!["wss://miodominio.net".to_string()]; let (receiver_key, receiver) = create_app_instance( "Receiver", @@ -89,6 +89,6 @@ async fn main() -> Result<(), CliError> { log::info!("Apps created"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(std::time::Duration::from_secs(60)).await; Ok(()) } diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 198b616..3b13661 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -35,10 +35,7 @@ pub async fn create_app_instance( let app = PortalApp::new( keypair.clone(), - vec![ - "wss://relay.nostr.net".to_string(), - "wss://relay.damus.io".to_string(), - ], + _relays, Arc::new(LogRelayStatusChange), ) .await?; diff --git a/src/router/actor.rs b/src/router/actor.rs index 54f3e12..ebcb3f6 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -1,10 +1,12 @@ +use core::f64; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; +// use std::time::Duration inside async blocks use nostr::{ - event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, hashes::hash160::Hash, message::{RelayMessage, SubscriptionId}, nips::nip44 + event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, message::{RelayMessage, SubscriptionId}, nips::nip44 }; use nostr_relay_pool::RelayPoolNotification; use serde::{Serialize, de::DeserializeOwned}; @@ -359,7 +361,6 @@ pub struct MessageRouterActorState { keypair: LocalKeypair, /// All conversation states conversations: HashMap, - queue_events: HashSet, } impl MessageRouterActorState { @@ -367,7 +368,6 @@ impl MessageRouterActorState { Self { keypair, conversations: HashMap::new(), - queue_events: HashSet::new() } } @@ -883,7 +883,31 @@ impl MessageRouterActorState { where C::Error: From, { - queue_event.broadcast(channel).await?; + let result = queue_event.broadcast(channel).await?; + if !result { + // incrementally delay until it is confirmed + log::warn!("New event with id {} is queued, delaying until it is confirmed", queue_event.event.id); + + let channel = channel.clone(); + let event = queue_event.clone(); + + tokio::spawn(async move { + let mut retry = 1.0; + let multiplier = f64::consts::E; + + let mut delivered = false; + while !delivered { + let eid = event.event.id; + log::warn!("{} retry of event {}", retry, eid); + tokio::time::sleep(std::time::Duration::from_secs_f64(retry * multiplier)).await; + delivered = event.broadcast(&channel).await.unwrap(); + + retry += 1.0; + } + }); + + } + Ok(()) } @@ -1214,6 +1238,7 @@ impl std::fmt::Debug for ConversationBox { } +#[derive(Clone)] pub struct QueuedEvent { event: Event, relays: Option>, @@ -1228,19 +1253,19 @@ impl QueuedEvent { } pub async fn broadcast(&self, channel: &Arc) - -> Result<(), ConversationError> + -> Result where C::Error: From, { let event = self.event.clone(); let relays = self.relays.clone(); - if let Some(relays) = relays { - channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + let result = if let Some(relays) = relays { + channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? } else { - channel.broadcast(event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; - } - Ok(()) + channel.broadcast(event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? + }; + Ok(result) } } \ No newline at end of file diff --git a/src/router/channel.rs b/src/router/channel.rs index 35209a4..f8fcb47 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -9,7 +9,7 @@ use crate::router::ids::PortalId; /// A trait for an abstract channel /// /// This is modeled around Nostr relays, in which we can subscribe to events matching a filter. -pub trait Channel: Send + 'static { +pub trait Channel: Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; fn subscribe( From f2d18b8b60e3c41fd829c9daf7e364573092dd20 Mon Sep 17 00:00:00 2001 From: unldenis Date: Tue, 7 Oct 2025 11:08:15 +0200 Subject: [PATCH 3/6] Now retrying failing relays --- src/router/actor.rs | 25 +++++++++++++++++-------- src/router/channel.rs | 21 ++++++++++++--------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/router/actor.rs b/src/router/actor.rs index ebcb3f6..960ed9e 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -6,7 +6,7 @@ use std::{ // use std::time::Duration inside async blocks use nostr::{ - event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, message::{RelayMessage, SubscriptionId}, nips::nip44 + event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, message::{RelayMessage, SubscriptionId}, nips::nip44, types::RelayUrl }; use nostr_relay_pool::RelayPoolNotification; use serde::{Serialize, de::DeserializeOwned}; @@ -884,7 +884,7 @@ impl MessageRouterActorState { C::Error: From, { let result = queue_event.broadcast(channel).await?; - if !result { + if !result.is_empty() { // incrementally delay until it is confirmed log::warn!("New event with id {} is queued, delaying until it is confirmed", queue_event.event.id); @@ -895,12 +895,12 @@ impl MessageRouterActorState { let mut retry = 1.0; let multiplier = f64::consts::E; - let mut delivered = false; - while !delivered { - let eid = event.event.id; - log::warn!("{} retry of event {}", retry, eid); + let mut remaining_relays = result; + while !remaining_relays.is_empty() { + log::warn!("{} retry of event {} on relays {:?}", retry, event.event.id, remaining_relays); tokio::time::sleep(std::time::Duration::from_secs_f64(retry * multiplier)).await; - delivered = event.broadcast(&channel).await.unwrap(); + + remaining_relays = event.broadcast_remaining(&channel, remaining_relays).await.unwrap(); retry += 1.0; } @@ -1253,7 +1253,7 @@ impl QueuedEvent { } pub async fn broadcast(&self, channel: &Arc) - -> Result + -> Result, ConversationError> where C::Error: From, { @@ -1268,4 +1268,13 @@ impl QueuedEvent { Ok(result) } + pub async fn broadcast_remaining(&self, channel: &Arc, relays: HashSet) -> Result, ConversationError> + where + C::Error: From, + { + let event = self.event.clone(); + let result = channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + Ok(result) + } + } \ No newline at end of file diff --git a/src/router/channel.rs b/src/router/channel.rs index f8fcb47..b97f8be 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -1,4 +1,6 @@ -use nostr::{message::SubscriptionId, types::TryIntoUrl}; +use std::collections::HashSet; + +use nostr::{message::SubscriptionId, types::{RelayUrl, TryIntoUrl}}; use nostr_relay_pool::{ RelayPool, RelayPoolNotification, SubscribeOptions, relay::{FlagCheck, RelayServiceFlags}, @@ -38,12 +40,12 @@ pub trait Channel: Send + Sync + 'static { fn broadcast( &self, event: nostr::Event, - ) -> impl std::future::Future> + Send; + ) -> impl std::future::Future, Self::Error>> + Send; fn broadcast_to( &self, urls: I, event: nostr::Event, - ) -> impl std::future::Future> + Send + ) -> impl std::future::Future, Self::Error>> + Send where ::IntoIter: Send, I: IntoIterator + Send, @@ -113,11 +115,11 @@ impl Channel for RelayPool { Ok(()) } - async fn broadcast(&self, event: nostr::Event) -> Result { + async fn broadcast(&self, event: nostr::Event) -> Result, Self::Error> { let output = self.send_event(&event).await?; - return Ok(!output.success.is_empty()); + return Ok(output.failed.keys().map(|url| url.to_string()).collect()); } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result + async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result, Self::Error> where ::IntoIter: Send, I: IntoIterator + Send, @@ -125,7 +127,8 @@ impl Channel for RelayPool { Self::Error: From<::Err>, { let output = self.send_event_to(urls, &event).await?; - return Ok(!output.success.is_empty()); + + return Ok(output.failed.keys().map(|url| url.to_string()).collect()); } async fn receive(&self) -> Result { @@ -167,11 +170,11 @@ impl Channel for std::sync::Arc { ::unsubscribe(self, id).await } - async fn broadcast(&self, event: nostr::Event) -> Result { + async fn broadcast(&self, event: nostr::Event) -> Result, Self::Error> { ::broadcast(self, event).await } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result + async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result, Self::Error> where ::IntoIter: Send, I: IntoIterator + Send, From b6beb77020b1954a224a4146197a6cea76ac8eea Mon Sep 17 00:00:00 2001 From: unldenis Date: Tue, 7 Oct 2025 11:31:07 +0200 Subject: [PATCH 4/6] Remove unnecessary `nostr::Event` clone --- src/router/actor.rs | 8 +++----- src/router/channel.rs | 16 ++++++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/router/actor.rs b/src/router/actor.rs index 960ed9e..0f3657a 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -1257,13 +1257,12 @@ impl QueuedEvent { where C::Error: From, { - let event = self.event.clone(); let relays = self.relays.clone(); let result = if let Some(relays) = relays { - channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? + channel.broadcast_to(relays, &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? } else { - channel.broadcast(event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? + channel.broadcast(&self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? }; Ok(result) } @@ -1272,8 +1271,7 @@ impl QueuedEvent { where C::Error: From, { - let event = self.event.clone(); - let result = channel.broadcast_to(relays, event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + let result = channel.broadcast_to(relays, &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; Ok(result) } diff --git a/src/router/channel.rs b/src/router/channel.rs index b97f8be..9a4669b 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -39,12 +39,12 @@ pub trait Channel: Send + Sync + 'static { fn broadcast( &self, - event: nostr::Event, + event: &nostr::Event, ) -> impl std::future::Future, Self::Error>> + Send; fn broadcast_to( &self, urls: I, - event: nostr::Event, + event: &nostr::Event, ) -> impl std::future::Future, Self::Error>> + Send where ::IntoIter: Send, @@ -115,18 +115,18 @@ impl Channel for RelayPool { Ok(()) } - async fn broadcast(&self, event: nostr::Event) -> Result, Self::Error> { - let output = self.send_event(&event).await?; + async fn broadcast(&self, event: &nostr::Event) -> Result, Self::Error> { + let output = self.send_event(event).await?; return Ok(output.failed.keys().map(|url| url.to_string()).collect()); } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result, Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result, Self::Error> where ::IntoIter: Send, I: IntoIterator + Send, U: TryIntoUrl, Self::Error: From<::Err>, { - let output = self.send_event_to(urls, &event).await?; + let output = self.send_event_to(urls, event).await?; return Ok(output.failed.keys().map(|url| url.to_string()).collect()); } @@ -170,11 +170,11 @@ impl Channel for std::sync::Arc { ::unsubscribe(self, id).await } - async fn broadcast(&self, event: nostr::Event) -> Result, Self::Error> { + async fn broadcast(&self, event: &nostr::Event) -> Result, Self::Error> { ::broadcast(self, event).await } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result, Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result, Self::Error> where ::IntoIter: Send, I: IntoIterator + Send, From 7a8c8eea8ecad14ab39119e63f4a8bf353274747 Mon Sep 17 00:00:00 2001 From: unldenis Date: Tue, 7 Oct 2025 14:15:07 +0200 Subject: [PATCH 5/6] Review copilot for queue event to broadcast --- src/router/actor.rs | 43 +++++++++++++++++++++++++++++-------------- src/router/channel.rs | 35 ++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/src/router/actor.rs b/src/router/actor.rs index 0f3657a..09b60fc 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -14,10 +14,9 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::StreamExt; use crate::{ - protocol::{LocalKeypair, model::event_kinds::SUBKEY_PROOF}, + protocol::{model::event_kinds::SUBKEY_PROOF, LocalKeypair}, router::{ - CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream, - PortalId, Response, channel::Channel, + channel::{BroadcastResult, Channel}, CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream, PortalId, Response }, }; @@ -884,7 +883,7 @@ impl MessageRouterActorState { C::Error: From, { let result = queue_event.broadcast(channel).await?; - if !result.is_empty() { + if !result.failed().is_empty() { // incrementally delay until it is confirmed log::warn!("New event with id {} is queued, delaying until it is confirmed", queue_event.event.id); @@ -892,17 +891,33 @@ impl MessageRouterActorState { let event = queue_event.clone(); tokio::spawn(async move { - let mut retry = 1.0; + let mut attempt = 0.0; + let base_delay = 1.0; // seconds let multiplier = f64::consts::E; - let mut remaining_relays = result; - while !remaining_relays.is_empty() { - log::warn!("{} retry of event {} on relays {:?}", retry, event.event.id, remaining_relays); - tokio::time::sleep(std::time::Duration::from_secs_f64(retry * multiplier)).await; + while !remaining_relays.failed().is_empty() { + attempt += 1.0; - remaining_relays = event.broadcast_remaining(&channel, remaining_relays).await.unwrap(); + log::warn!("Attempt {} of event {} on relays {:?}", attempt, event.event.id, remaining_relays.failed()); + let mut secs = base_delay * multiplier.powf(attempt); + if secs > 300.0 { + secs = 300.0; + } + tokio::time::sleep(std::time::Duration::from_secs_f64(secs)).await; + match event.broadcast_remaining(&channel, &remaining_relays).await { + Ok(result) => { + remaining_relays = result; + }, + Err(_) => { + log::error!("Event {} failed to broadcast on relays {:?}", event.event.id, remaining_relays.failed()); + }, + } - retry += 1.0; + if attempt > 25.0 { + log::error!("Event {} failed to broadcast after 25 attempts", event.event.id); + break; + } + } }); @@ -1253,7 +1268,7 @@ impl QueuedEvent { } pub async fn broadcast(&self, channel: &Arc) - -> Result, ConversationError> + -> Result where C::Error: From, { @@ -1267,11 +1282,11 @@ impl QueuedEvent { Ok(result) } - pub async fn broadcast_remaining(&self, channel: &Arc, relays: HashSet) -> Result, ConversationError> + pub async fn broadcast_remaining(&self, channel: &Arc, relays: &BroadcastResult) -> Result where C::Error: From, { - let result = channel.broadcast_to(relays, &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + let result = channel.broadcast_to(relays.failed().clone(), &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; Ok(result) } diff --git a/src/router/channel.rs b/src/router/channel.rs index 9a4669b..0765426 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use nostr::{message::SubscriptionId, types::{RelayUrl, TryIntoUrl}}; +use nostr::{message::SubscriptionId, types::TryIntoUrl}; use nostr_relay_pool::{ RelayPool, RelayPoolNotification, SubscribeOptions, relay::{FlagCheck, RelayServiceFlags}, @@ -40,12 +40,12 @@ pub trait Channel: Send + Sync + 'static { fn broadcast( &self, event: &nostr::Event, - ) -> impl std::future::Future, Self::Error>> + Send; + ) -> impl std::future::Future> + Send; fn broadcast_to( &self, urls: I, event: &nostr::Event, - ) -> impl std::future::Future, Self::Error>> + Send + ) -> impl std::future::Future> + Send where ::IntoIter: Send, I: IntoIterator + Send, @@ -59,6 +59,23 @@ pub trait Channel: Send + Sync + 'static { fn shutdown(&self) -> impl std::future::Future> + Send; } +/// A result of a Nostr broadcast +#[derive(Debug, Clone)] +pub struct BroadcastResult { + failed: HashSet, +} + +impl BroadcastResult { + pub fn new(failed: HashSet) -> Self { + Self { failed } + } + + /// Get the failed relays + pub fn failed(&self) -> &HashSet { + &self.failed + } +} + impl Channel for RelayPool { type Error = nostr_relay_pool::pool::Error; @@ -115,11 +132,11 @@ impl Channel for RelayPool { Ok(()) } - async fn broadcast(&self, event: &nostr::Event) -> Result, Self::Error> { + async fn broadcast(&self, event: &nostr::Event) -> Result { let output = self.send_event(event).await?; - return Ok(output.failed.keys().map(|url| url.to_string()).collect()); + return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect())); } - async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result, Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send, @@ -128,7 +145,7 @@ impl Channel for RelayPool { { let output = self.send_event_to(urls, event).await?; - return Ok(output.failed.keys().map(|url| url.to_string()).collect()); + return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect())); } async fn receive(&self) -> Result { @@ -170,11 +187,11 @@ impl Channel for std::sync::Arc { ::unsubscribe(self, id).await } - async fn broadcast(&self, event: &nostr::Event) -> Result, Self::Error> { + async fn broadcast(&self, event: &nostr::Event) -> Result { ::broadcast(self, event).await } - async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result, Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send, From d2e11a88762d2f42a34acb5b6d0fd042d87f7a4a Mon Sep 17 00:00:00 2001 From: unldenis Date: Wed, 8 Oct 2025 12:13:05 +0200 Subject: [PATCH 6/6] cli: add payment request test --- cli/src/bin/payment_request.rs | 95 ++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 cli/src/bin/payment_request.rs diff --git a/cli/src/bin/payment_request.rs b/cli/src/bin/payment_request.rs new file mode 100644 index 0000000..f50765d --- /dev/null +++ b/cli/src/bin/payment_request.rs @@ -0,0 +1,95 @@ +use std::{sync::Arc, time::Duration as StdDuration}; + +use app::{CallbackError, CashuRequestListener, PaymentRequestListener, PaymentStatusNotifier, RecurringPaymentRequest, SinglePaymentRequest}; +use cli::{CliError, create_app_instance, create_sdk_instance}; +use portal::protocol::model::{ + payment::{Currency, RecurringPaymentResponseContent, RecurringPaymentStatus, SinglePaymentRequestContent}, Timestamp +}; + +struct LogPaymentRequestListener; + +#[async_trait::async_trait] +impl PaymentRequestListener for LogPaymentRequestListener { + async fn on_single_payment_request( + &self, + event: SinglePaymentRequest, + notifier: Arc, + ) -> Result<(), CallbackError> { + log::info!("Receiver: Received Payment request: {:?}", event); + // Always approve for test + Ok(()) + } + async fn on_recurring_payment_request( + &self, + event: RecurringPaymentRequest, + ) -> Result { + log::info!("Receiver: Received Recurring Payment request: {:?}", event); + Ok(RecurringPaymentResponseContent { + request_id: event.content.request_id, + status: RecurringPaymentStatus::Rejected { reason: Some("User rejected".to_string()) }, + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), CliError> { + env_logger::init(); + + let relays = vec!["wss://relay.nostr.net".to_string()]; + + let (receiver_key, receiver) = create_app_instance( + "Receiver", + "mass derive myself benefit shed true girl orange family spawn device theme", + relays.clone(), + ) + .await?; + let _receiver = receiver.clone(); + + tokio::spawn(async move { + log::info!("Receiver: Setting up Payment request listener"); + _receiver + .listen_for_payment_request(Arc::new(LogPaymentRequestListener)) + .await + .expect("Receiver: Error creating listener"); + }); + + let sender_sdk = create_sdk_instance( + "draft sunny old taxi chimney ski tilt suffer subway bundle once story", + relays.clone(), + ) + .await?; + + log::info!("Apps created, waiting 5 seconds before sending request"); + tokio::time::sleep(StdDuration::from_secs(5)).await; + + let request_content = SinglePaymentRequestContent { + amount: 1000, + currency: Currency::Millisats, + current_exchange_rate: None, + invoice: "invoice".to_string(), + auth_token: None, + expires_at: Timestamp::now_plus_seconds(300), + subscription_id: None, + description: Some("test".to_string()), + request_id: "test".to_string(), + }; + + let mut response = sender_sdk + .request_single_payment(receiver_key.public_key().0, vec![], request_content) + .await + .unwrap(); + + while let Some(resp) = response.next().await { + + match resp { + Ok(resp) => { + log::info!("Sender: Received payment: {:?}", resp); + } + Err(e) => { + log::error!("Sender: Error receiving payment: {}", e); + } + } + } + + Ok(()) +}